GVKun编程网logo

创建已经完成的CompletableFuture的正确方法是什么(创建的件)

12

本文的目的是介绍创建已经完成的CompletableFuture的正确方法是什么的详细情况,特别关注创建的件的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解创建已经完

本文的目的是介绍创建已经完成的CompletableFuture的正确方法是什么的详细情况,特别关注创建的件的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解创建已经完成的CompletableFuture的正确方法是什么的机会,同时也不会遗漏关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()、CompletableFuture CompletableFuture.supplyAsync 异常处理的知识。

本文目录一览:

创建已经完成的CompletableFuture的正确方法是什么(创建的件)

创建已经完成的CompletableFuture的正确方法是什么(创建的件)

我正在Java 8中使用Completable futures,并且我想编写一种方法,该方法基于接收到的参数并行运行多个具有副作用的任务,然后返回其“组合”
future(使用CompletableFuture.allOf()),或者什么都不做,然后返回已经完成的未来。

但是,allOf返回一个CompletableFuture<Void>

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

创建已知的已经完成的未来的唯一方法是使用completedFuture(),它需要一个值:

public static <U> CompletableFuture<U> completedFuture(U value)

返回一个已经用给定值完成的新CompletableFuture。

并且Void是无法实例化的,因此我需要另一种方法来创建type的已经完成的future CompletableFuture<Void>

做这个的最好方式是什么?

答案1

小编典典

由于Void无法实例化,因此只能完成一个CompletableFuture<Void>带有null结果的a
,这恰好是您调用成功完成join()后返回的allOf()Future 时也会得到的结果。

所以你可以使用

CompletableFuture<Void> cf = CompletableFuture.completedFuture(null);

得到这样一个已经完成的未来。

但是你也可以使用

CompletableFuture<Void> cf = CompletableFuture.allOf();

表示结果不依赖任何作业。结果将完全相同。

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

一、概述

  创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

  详述:https://www.cnblogs.com/bjlhx/p/7588971.html

1.1、Runnable 接口

它是一个接口,里面只声明了一个 run () 方法:

public interface Runnable {
    public abstract void run();
}

由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。

1.2、Callable 接口

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。

public interface Callable<V> {   
    V call() throws Exception;
}

是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。

1.3、Future 接口

  Future 的核心思想是:

    一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。

  Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:

    get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

    cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。

    isDone 方法:判断是否计算完

    isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  也就是说 Future 提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

  Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

使用 Callable+Future 获取执行结果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.4、FutureTask 类

FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。

如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:

把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。

1.4.1、FutureTask 结构

  

1.4.2、FutureTask 使用

方式一、使用 thread 方式

  FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动

public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("计算线程正在计算结果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main线程干点别的...");

        Integer result = task.get();
        System.out.println("从计算线程拿到的结果为:" + result);
    }

方式二、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式

public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);
        //同时开启了两个任务
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 关闭线程池和服务
                    return;
                }

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用 Callable+FutureTask 获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.5、CompletionService

原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用内部阻塞队列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("总耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}
View Code

建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。

二、CompletableFuture

2.1、对标 Futrue

  Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

  阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。

    Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。

2.2、类图

  

2.2.1、CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.2.2、Future

2.3、创建 CompletableFuture 对象

  CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.

  以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。

  runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。

  supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。

  注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。

示例:简单同步用法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //长时间的计算任务
            try {
                System.out.println("计算型任务开始");
                Thread.sleep(2000);
                return "计算型任务结束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.4、计算结果完成时的处理

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

  方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.5、转换

CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.6、异常处理 completeExceptionally

  为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

  这样,当执行任务发生异常时,调用 get() 方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的 Exception 参数。

/**
     * 任务没有异常 正常执行,然后结束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

    /**
     * 线程有异常  正常执行,然后无法结束,主线程会一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
View Code
/**
     * 线程有异常  正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
     * 主线程接收到 程序继续处理,至结束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

2.7、多任务组合方法 allOf 和 anyOf

allOf 是等待所有任务完成,构造后 CompletableFuture 完成

anyOf 是只要有一个任务完成,构造后 CompletableFuture 就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
                    + "任务i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.8、常用多线程并发,取结果归集的几种实现方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future 接口 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable 内部通过阻塞队列 + FutureTask 接口 JDK8 实现了 Future, CompletionStage 两个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 原生 API 支持,返回每个任务的异常
建议 CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 API 极端丰富,配合流式编程,推荐使用!

上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

 

   

CompletableFuture allof(..)join()与CompletableFuture.join()

CompletableFuture allof(..)join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

CompletableFuture allof(..)。join()与CompletableFuture.join()

CompletableFuture allof(..)。join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture 异常处理completeExceptionally可以把异常抛到主线程
/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    @Test
    public void testMethod() {

        String[] orders = {"1", "2", "3", "4", "5", "6"};

        List<CompletableFuture<Boolean>> futures = new ArrayList<>();

        Arrays.stream(orders).forEach(id -> {
            try{
                futures.add(submitAsync(id));
            }catch (Exception ex){
                System.out.println(ex);
            }
        });

        futures.stream().forEach(f-> {
            try {
                System.out.println(f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

    private static CompletableFuture<Boolean> submitAsync(String order) {
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Boolean result = submit(order);
                future.complete(result);
            } catch (Exception ex) {
                future.completeExceptionally(ex);
            }
        }).start();
        return future;
    }

}

 

使用 CompletableFuture.supplyAsync  简化代码 加入线程池,exceptionally处理异常

/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    ExecutorService executor = Executors.newFixedThreadPool(3);

    @Test
    public void testMethod() {
        String[] orders = {"1", "2", "3", "4", "5", "6"};
        Arrays.stream(orders).forEach(id -> CompletableFuture.supplyAsync(() -> submit(id), executor).exceptionally(e -> {
            System.out.println(e);
            return false;
        }));

        executor.shutdown();
        while (!executor.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

}

 

今天关于创建已经完成的CompletableFuture的正确方法是什么创建的件的分享就到这里,希望大家有所收获,若想了解更多关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()、CompletableFuture CompletableFuture.supplyAsync 异常处理等相关知识,可以在本站进行查询。

本文标签: