GVKun编程网logo

什么时候应该 TaskCompletionSource使用?(task前面加什么)

4

本文的目的是介绍什么时候应该TaskCompletionSource使用?的详细情况,特别关注task前面加什么的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解什么时

本文的目的是介绍什么时候应该 TaskCompletionSource使用?的详细情况,特别关注task前面加什么的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解什么时候应该 TaskCompletionSource使用?的机会,同时也不会遗漏关于.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件、005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、C ++标准库-什么时候应该使用它,什么时候不应该使用?、c# TaskCompletionSource的知识。

本文目录一览:

什么时候应该 TaskCompletionSource使用?(task前面加什么)

什么时候应该 TaskCompletionSource使用?(task前面加什么)

AFAIK,它所知道的是,在某些时候,它的SetResultorSetException方法被调用来完成Task<T>通过它的Task属性暴露。

换句话说,它充当 aTask<TResult>及其完成的生产者。

我在 这里 看到了这个例子:

如果我需要一种方法来Func<T>异步执行 a 并有 aTask<T> 来表示该操作。

public static Task<T> RunAsync<T>(Func<T> function) {     if (function == null) throw new ArgumentNullException(鈥渇unction鈥�);     var tcs = new TaskCompletionSource<T>();     ThreadPool.QueueUserWorkItem(_ =>     {         try         {              T result = function();             tcs.SetResult(result);          }         catch(Exception exc) { tcs.SetException(exc); }     });     return tcs.Task; }

如果我没有 - 可以使用Task.Factory.StartNew- 但我 确实Task.Factory.StartNew

问题:

有人可以通过示例解释一个与我没有的假设情况 直接 相关TaskCompletionSource 而不是与
假设Task.Factory.StartNew情况相关的场景吗?

答案1

小编典典

我主要在只有基于事件的 API 可用时使用它(例如 Windows Phone 8
套接字):

public Task<Args> SomeApiWrapper(){    TaskCompletionSource<Args> tcs = new TaskCompletionSource<Args>();    var obj = new SomeApi();    // will get raised, when the work is done    obj.Done += (args) =>     {        // this will notify the caller         // of the SomeApiWrapper that         // the task just completed        tcs.SetResult(args);    }    // start the work    obj.Do();    return tcs.Task;}

async因此,它与 C#5关键字一起使用时特别有用。

.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件

.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件

 

你可以使用临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphores)和事件(Event)来处理线程同步。然而,在编写一些异步处理函数,尤其是还有 async 和 await 使用的时候,还有一些更方便的类型可以用来处理线程同步。

使用 TaskCompletionSource,你可以轻松地编写既可以异步等待,又可以同步等待的代码来。


 

本文内容

      • 等待事件
      • 引发事件

 

等待事件

我们创建一个 TaskCompletionSource<object> 对象,这样,我们便可以写出一个既可以同步等待又可以异步等待的方法:

public class WalterlvDemo
{
    private readonly TaskCompletionSource<object> _source = new TaskCompletionSource<object>();

    public Task WaitAsync() => _source.Task;

    public void Wait() => _source.Task.GetAwaiter().GetResult();
}

等待时可以同步:

demo.Wait();

也可以异步:

await demo.WaitAsync();

而同步的那个方法,便可以用来做线程同步使用。

引发事件

要像一个事件一样让同步等待阻塞着的线程继续跑起来,则需要设置这个事件。

TaskCompletionSource<object> 提供了很多让任务完成的方法:

TaskCompletionSource 中的方法

可以通过让这个 TaskCompletionSource<object> 完成、取消或设置异常的方式让这个 Task 进入完成、取消或错误状态,然后等待它的线程就会继续执行;当然如果有异常,就会让等待的线程收到一个需要处理的异常。

_source.SetResult(null);

我的博客会首发于 https://walterlv.com/,而 CSDN 和博客园仅从其中摘选发布,而且一旦发布了就不再更新。

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。欢迎转载、使用、重新发布,但务必保留文章署名吕毅(包含链接:https://blog.csdn.net/wpwalter),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

一、概述

  创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

  详述:https://www.cnblogs.com/bjlhx/p/7588971.html

1.1、Runnable 接口

它是一个接口,里面只声明了一个 run () 方法:

public interface Runnable {
    public abstract void run();
}

由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。

1.2、Callable 接口

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。

public interface Callable<V> {   
    V call() throws Exception;
}

是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。

1.3、Future 接口

  Future 的核心思想是:

    一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。

  Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:

    get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

    cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。

    isDone 方法:判断是否计算完

    isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  也就是说 Future 提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

  Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

使用 Callable+Future 获取执行结果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.4、FutureTask 类

FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。

如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:

把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。

1.4.1、FutureTask 结构

  

1.4.2、FutureTask 使用

方式一、使用 thread 方式

  FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动

public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("计算线程正在计算结果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main线程干点别的...");

        Integer result = task.get();
        System.out.println("从计算线程拿到的结果为:" + result);
    }

方式二、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式

public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);
        //同时开启了两个任务
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 关闭线程池和服务
                    return;
                }

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用 Callable+FutureTask 获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.5、CompletionService

原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用内部阻塞队列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("总耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}
View Code

建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。

二、CompletableFuture

2.1、对标 Futrue

  Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

  阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。

    Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。

2.2、类图

  

2.2.1、CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.2.2、Future

2.3、创建 CompletableFuture 对象

  CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.

  以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。

  runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。

  supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。

  注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。

示例:简单同步用法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //长时间的计算任务
            try {
                System.out.println("计算型任务开始");
                Thread.sleep(2000);
                return "计算型任务结束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.4、计算结果完成时的处理

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

  方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.5、转换

CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.6、异常处理 completeExceptionally

  为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

  这样,当执行任务发生异常时,调用 get() 方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的 Exception 参数。

/**
     * 任务没有异常 正常执行,然后结束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

    /**
     * 线程有异常  正常执行,然后无法结束,主线程会一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
View Code
/**
     * 线程有异常  正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
     * 主线程接收到 程序继续处理,至结束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

2.7、多任务组合方法 allOf 和 anyOf

allOf 是等待所有任务完成,构造后 CompletableFuture 完成

anyOf 是只要有一个任务完成,构造后 CompletableFuture 就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
                    + "任务i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.8、常用多线程并发,取结果归集的几种实现方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future 接口 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable 内部通过阻塞队列 + FutureTask 接口 JDK8 实现了 Future, CompletionStage 两个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 原生 API 支持,返回每个任务的异常
建议 CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 API 极端丰富,配合流式编程,推荐使用!

上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

 

   

C ++标准库-什么时候应该使用它,什么时候不应该使用?

C ++标准库-什么时候应该使用它,什么时候不应该使用?

我想知道人们实际上经常使用许多标准c ++库,尤其是<algorithm>and
<numeric>标头中的内容。教科书似乎推荐它们,但是我从没见过我筛选过的各个项目中的任何一个(巧合?),就个人而言,每次自己编写适当的简单算法似乎比记住或记住更容易。每次查阅这些标头的参考。只是懒惰还是固执?使用这些库时,实际上是否有性能提升等?

谢谢,

[R

c# TaskCompletionSource

c# TaskCompletionSource

对 TaskCompletionSource 完全不清楚,msdn 上有介绍:https://technet.microsoft.com/zh-cn/library/dd449174(v=vs.110).aspx,

https://www.cnblogs.com/farb/p/4870421.html,

https://msdn.microsoft.com/zh-cn/library/dd997423(v=vs.100).aspx,

https://msdn.microsoft.com/zh-cn/library/dd537609(v=vs.100).aspx,

https://msdn.microsoft.com/zh-cn/library/dd997423 (v=vs.100).aspx 中有一句:TaskCompletionSource<TResult> 创建的任何任务将由 TaskCompletionSource 启动,因此,用户代码不应在该任务上调用 Start 方法。

还有一篇 TaskCompletionSource 相关的异步编程好帖子 全面解析 C# 中的异步编程 https://www.cnblogs.com/xiaoyaojian/p/4603238.html 其中有一段代码做实验应该会有收获:

static async void ReadAssignedFile()
        {
            byte[] buffer;
            try
            {
                double length = await ReadFileAsync("SomeFileDoNotExisted.txt", out buffer);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        static Task<double> ReadFileAsync(string filePath,out byte[] buffer) 
        {
            Stream stream = File.Open(filePath, FileMode.Open);
            buffer = new byte[stream.Length];
            var tcs = new TaskCompletionSource<double>();
            stream.BeginRead(buffer, 0, buffer.Length, arr =>
            {
                try
                {
                    var length = stream.EndRead(arr);
                    tcs.SetResult(stream.Length);
                }
                catch (IOException ex)
                {
                    tcs.SetException(ex);
                }
            }, null);
            return tcs.Task;
        }

https://msdn.microsoft.com/zh-cn/magazine/ff959203.aspx

https://msdn.microsoft.com/zh-cn/library/ee622454(v=vs.100).aspx 也有介绍,现在开始做实验:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TaskCompletionSourceDemo
{
    class WebDataDownloader
    {
        static void Main()
        {
            WebDataDownloader downloader = new WebDataDownloader();
            string[] addresses = { "http://www.baidu.com", "http://www.yahoo.com",
                                 "http://www.sina.com", "http://www.qq.com" };
            CancellationTokenSource cts = new CancellationTokenSource();

            // Create a UI thread from which to cancel the entire operation
            Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Press c to cancel");
                if (Console.ReadKey().KeyChar == ''c'')
                    cts.Cancel();
            });

            // Using a neutral search term that is sure to get some hits.
            Task<string[]> webTask = downloader.GetWordCounts(addresses, "the", cts.Token);

            // Do some other work here unless the method has already completed.
            if (!webTask.IsCompleted)
            {
                // Simulate some work.
                Thread.SpinWait(5000000);
            }

            string[] results = null;
            try
            {
                results = webTask.Result;
            }
            catch (AggregateException e)
            {
                foreach (var ex in e.InnerExceptions)
                {
                    OperationCanceledException oce = ex as OperationCanceledException;
                    if (oce != null)
                    {
                        if (oce.CancellationToken == cts.Token)
                        {
                            Console.WriteLine("Operation canceled by user.");
                        }
                    }
                    else
                        Console.WriteLine(ex.Message);
                }
            }

            if (results != null)
            {
                foreach (var item in results)
                    Console.WriteLine(item);
            }
            Console.ReadKey();
        }

        Task<string[]> GetWordCounts(string[] urls, string name, CancellationToken token)
        {
            TaskCompletionSource<string[]> tcs = new TaskCompletionSource<string[]>();
            WebClient[] webClients = new WebClient[urls.Length];

            // If the user cancels the CancellationToken, then we can use the
            // WebClient''s ability to cancel its own async operations.
            token.Register(() =>
            {
                foreach (var wc in webClients)
                {
                    if (wc != null)
                        wc.CancelAsync();
                }
            });

            object m_lock = new object();
            int count = 0;
            List<string> results = new List<string>();
            for (int i = 0; i < urls.Length; i++)
            {
                webClients[i] = new WebClient();

                #region callback
                // Specify the callback for the DownloadStringCompleted
                // event that will be raised by this WebClient instance.
                webClients[i].DownloadStringCompleted += (obj, args) =>
                {
                    if (args.Cancelled == true)
                    {
                        tcs.TrySetCanceled();
                        return;
                    }
                    else if (args.Error != null)
                    {
                        // Pass through to the underlying Task
                        // any exceptions thrown by the WebClient
                        // during the asynchronous operation.
                        tcs.TrySetException(args.Error);
                        return;
                    }
                    else
                    {
                        // Split the string into an array of words,
                        // then count the number of elements that match
                        // the search term.
                        string[] words = null;
                        words = args.Result.Split('' '');
                        string NAME = name.ToUpper();
                        int nameCount = (from word in words.AsParallel()
                                         where word.ToUpper().Contains(NAME)
                                         select word)
                                        .Count();

                        // Associate the results with the url, and add new string to the array that 
                        // the underlying Task object will return in its Result property.
                        results.Add(String.Format("{0} has {1} instances of {2}", args.UserState, nameCount, name));
                    }

                    // If this is the last async operation to complete,
                    // then set the Result property on the underlying Task.
                    lock (m_lock)
                    {
                        count++;
                        if (count == urls.Length)
                        {
                            tcs.TrySetResult(results.ToArray());
                        }
                    }
                };
                #endregion

                // Call DownloadStringAsync for each URL.
                Uri address = null;
                try
                {
                    address = new Uri(urls[i]);
                    // Pass the address, and also use it for the userToken 
                    // to identify the page when the delegate is invoked.
                    webClients[i].DownloadStringAsync(address, address);
                }

                catch (UriFormatException ex)
                {
                    // Abandon the entire operation if one url is malformed.
                    // Other actions are possible here.
                    tcs.TrySetException(ex);
                    return tcs.Task;
                }
            }

            // Return the underlying Task. The client code
            // waits on the Result property, and handles exceptions
            // in the try-catch block there.
            return tcs.Task;
        }
    }
}

程序执行的结果如下:

Press c to cancel
http://www.baidu.com/ has 1 instances of the
http://www.qq.com/ has 72 instances of the
http://www.sina.com/ has 2 instances of the
http://www.yahoo.com/ has 0 instances of the

我们今天的关于什么时候应该 TaskCompletionSource使用?task前面加什么的分享就到这里,谢谢您的阅读,如果想了解更多关于.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件、005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、C ++标准库-什么时候应该使用它,什么时候不应该使用?、c# TaskCompletionSource的相关信息,可以在本站进行搜索。

本文标签: