本文的目的是介绍什么时候应该TaskCompletionSource使用?的详细情况,特别关注task前面加什么的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解什么时
本文的目的是介绍什么时候应该 TaskCompletionSource使用?的详细情况,特别关注task前面加什么的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解什么时候应该 TaskCompletionSource使用?的机会,同时也不会遗漏关于.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件、005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、C ++标准库-什么时候应该使用它,什么时候不应该使用?、c# TaskCompletionSource的知识。
本文目录一览:- 什么时候应该 TaskCompletionSource使用?(task前面加什么)
- .NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件
- 005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture
- C ++标准库-什么时候应该使用它,什么时候不应该使用?
- c# TaskCompletionSource
什么时候应该 TaskCompletionSource使用?(task前面加什么)
AFAIK,它所知道的是,在某些时候,它的SetResult
orSetException
方法被调用来完成Task<T>
通过它的Task
属性暴露。
换句话说,它充当 aTask<TResult>
及其完成的生产者。
我在 这里 看到了这个例子:
如果我需要一种方法来
Func<T>
异步执行 a 并有 aTask<T>
来表示该操作。
public static Task<T> RunAsync<T>(Func<T> function) { if (function == null) throw new ArgumentNullException(鈥渇unction鈥�); var tcs = new TaskCompletionSource<T>(); ThreadPool.QueueUserWorkItem(_ => { try { T result = function(); tcs.SetResult(result); } catch(Exception exc) { tcs.SetException(exc); } }); return tcs.Task; }
如果我没有 - 可以使用Task.Factory.StartNew
- 但我 确实 有Task.Factory.StartNew
。
问题:
有人可以通过示例解释一个与我没有的假设情况 直接 相关TaskCompletionSource
而不是与
假设Task.Factory.StartNew
情况相关的场景吗?
答案1
小编典典我主要在只有基于事件的 API 可用时使用它(例如 Windows Phone 8
套接字):
public Task<Args> SomeApiWrapper(){ TaskCompletionSource<Args> tcs = new TaskCompletionSource<Args>(); var obj = new SomeApi(); // will get raised, when the work is done obj.Done += (args) => { // this will notify the caller // of the SomeApiWrapper that // the task just completed tcs.SetResult(args); } // start the work obj.Do(); return tcs.Task;}
async
因此,它与 C#5关键字一起使用时特别有用。
.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件
你可以使用临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphores)和事件(Event)来处理线程同步。然而,在编写一些异步处理函数,尤其是还有 async 和 await 使用的时候,还有一些更方便的类型可以用来处理线程同步。
使用 TaskCompletionSource
,你可以轻松地编写既可以异步等待,又可以同步等待的代码来。
本文内容
- 等待事件
- 引发事件
等待事件
我们创建一个 TaskCompletionSource<object>
对象,这样,我们便可以写出一个既可以同步等待又可以异步等待的方法:
public class WalterlvDemo
{
private readonly TaskCompletionSource<object> _source = new TaskCompletionSource<object>();
public Task WaitAsync() => _source.Task;
public void Wait() => _source.Task.GetAwaiter().GetResult();
}
等待时可以同步:
demo.Wait();
也可以异步:
await demo.WaitAsync();
而同步的那个方法,便可以用来做线程同步使用。
引发事件
要像一个事件一样让同步等待阻塞着的线程继续跑起来,则需要设置这个事件。
而 TaskCompletionSource<object>
提供了很多让任务完成的方法:
可以通过让这个 TaskCompletionSource<object>
完成、取消或设置异常的方式让这个 Task 进入完成、取消或错误状态,然后等待它的线程就会继续执行;当然如果有异常,就会让等待的线程收到一个需要处理的异常。
_source.SetResult(null);
我的博客会首发于 https://walterlv.com/,而 CSDN 和博客园仅从其中摘选发布,而且一旦发布了就不再更新。
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。欢迎转载、使用、重新发布,但务必保留文章署名吕毅(包含链接:https://blog.csdn.net/wpwalter),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture
一、概述
创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。
详述:https://www.cnblogs.com/bjlhx/p/7588971.html
1.1、Runnable 接口
它是一个接口,里面只声明了一个 run () 方法:
public interface Runnable {
public abstract void run();
}
由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。
1.2、Callable 接口
Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。
public interface Callable<V> {
V call() throws Exception;
}
是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。
1.3、Future 接口
Future 的核心思想是:
一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。
Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:
get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。
cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。
isDone 方法:判断是否计算完
isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
也就是说 Future 提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。
因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。
使用 Callable+Future 获取执行结果:


public class Test {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果"+result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
1.4、FutureTask 类
FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。
如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:
把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。
1.4.1、FutureTask 结构
1.4.2、FutureTask 使用
方式一、使用 thread 方式
FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动
public static void main(String[] args) throws Exception {
Callable<Integer> call = () -> {
System.out.println("计算线程正在计算结果...");
Thread.sleep(3000);
return 1;
};
FutureTask<Integer> task = new FutureTask<>(call);
Thread thread = new Thread(task);
thread.start();
System.out.println("main线程干点别的...");
Integer result = task.get();
System.out.println("从计算线程拿到的结果为:" + result);
}
方式二、使用 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式


public static void main(String[] args) {
Callable<String> callable1=()->{
Thread.sleep(2000);
return Thread.currentThread().getName();
};
Callable<String> callable2=()->{
Thread.sleep(3000);
return Thread.currentThread().getName();
};
FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
FutureTask<String> futureTask2 = new FutureTask<>(callable2);
ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例
executor.execute(futureTask1); // 执行任务
executor.execute(futureTask2);
//同时开启了两个任务
long startTime = System.currentTimeMillis();
while (true) {
try {
if(futureTask1.isDone() && futureTask2.isDone()){// 两个任务都完成
System.out.println("Done");
executor.shutdown(); // 关闭线程池和服务
return;
}
if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
System.out.println("FutureTask1 output="+futureTask1.get());
}
System.out.println("Waiting for FutureTask2 to complete");
String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
if(s !=null){
System.out.println("FutureTask2 output="+s);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}catch(TimeoutException e){
//do nothing
}
System.out.println((System.currentTimeMillis()-startTime));
}
}
使用 Callable+FutureTask 获取执行结果


public class Test {
public static void main(String[] args) {
//第一种方式
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
executor.shutdown();
//第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
/*Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();*/
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果"+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
1.5、CompletionService
原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。


package com.lhx.cloud.futruetask;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceDemo {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//开启5个线程
ExecutorService exs = Executors.newFixedThreadPool(5);
try {
int taskCount = 10;
//结果集
List<Integer> list = new ArrayList<>();
//1.定义CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
List<Future<Integer>> futureList = new ArrayList<>();
//2.添加任务
for(int i=0;i<taskCount;i++){
futureList.add(completionService.submit(new Task(i+1)));
}
//==================结果归集===================
//方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
// for (Future<Integer> future : futureList) {
// System.out.println("====================");
// Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
// System.out.println("任务result="+result+"获取到结果!"+new Date());
// list.add(result);
// }
// //方法2.使用内部阻塞队列的take()
for(int i=0;i<taskCount;i++){
Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
list.add(result);
}
System.out.println("list="+list);
System.out.println("总耗时="+(System.currentTimeMillis()-start));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();//关闭线程池
}
}
static class Task implements Callable<Integer>{
Integer i;
public Task(Integer i) {
super();
this.i=i;
}
@Override
public Integer call() throws Exception {
if(i==5){
Thread.sleep(5000);
}else{
Thread.sleep(1000);
}
System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
return i;
}
}
}
建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。
二、CompletableFuture
2.1、对标 Futrue
Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。
Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。
2.2、类图
2.2.1、CompletionStage
-
CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
-
一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())
-
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
2.2.2、Future
2.3、创建 CompletableFuture 对象
CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.
以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。
runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。
supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。
注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。
示例:简单同步用法


public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//长时间的计算任务
try {
System.out.println("计算型任务开始");
Thread.sleep(2000);
return "计算型任务结束";
} catch (InterruptedException e) {
e.printStackTrace();
}
return "·00";
});
System.out.println(future.get());
}
2.4、计算结果完成时的处理
当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
示例:


package com.lhx.cloud.futruetask;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class BasicFuture {
private static Random rand = new Random();
private static long t = System.currentTimeMillis();
static int getMoreData() {
System.out.println("begin to start compute");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
return rand.nextInt(1000);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(f.get());
}}
2.5、转换
CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
2.6、异常处理 completeExceptionally
为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally
方法将导致 CompletableFuture 内发生问题的异常抛出。
这样,当执行任务发生异常时,调用 get()
方法的线程将会收到一个 ExecutionException
异常,该异常接收了一个包含失败原因的 Exception 参数。


/**
* 任务没有异常 正常执行,然后结束
*/
@Test
public void test1() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
/**
* 线程有异常 正常执行,然后无法结束,主线程会一直等待
*/
@Test
public void test2() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
/**
* 线程有异常 正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
* 主线程接收到 程序继续处理,至结束
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i = 1/0;
} catch (Exception e) {
// 告诉completableFuture任务发生异常了
completableFuture.completeExceptionally(e);
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
2.7、多任务组合方法 allOf 和 anyOf
allOf
是等待所有任务完成,构造后 CompletableFuture 完成
anyOf
是只要有一个任务完成,构造后 CompletableFuture 就完成


package com.lhx.cloud.futruetask;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
// 结果集
List<String> list = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
// 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = taskList.stream()
.map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
.thenApply(h -> Integer.toString(h))
.whenComplete((s, e) -> {
System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
list.add(s);
})
).toArray(CompletableFuture[]::new);
// 封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
}
public static Integer calc(Integer i) {
try {
if (i == 1) {
Thread.sleep(3000);//任务1耗时3秒
} else if (i == 5) {
Thread.sleep(5000);//任务5耗时5秒
} else {
Thread.sleep(1000);//其它任务耗时1秒
}
System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
+ "任务i=" + i + ",完成!" );
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
2.8、常用多线程并发,取结果归集的几种实现方案
描述 | Future | FutureTask | CompletionService | CompletableFuture |
---|---|---|---|---|
原理 | Future 接口 | 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable | 内部通过阻塞队列 + FutureTask 接口 | JDK8 实现了 Future, CompletionStage 两个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 按照提交顺序获取结果 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 原生 API 支持,返回每个任务的异常 |
建议 | CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 | 功能不对口,并发任务这一块多套一层,不推荐使用 | 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 | API 极端丰富,配合流式编程,推荐使用! |
上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html
C ++标准库-什么时候应该使用它,什么时候不应该使用?
我想知道人们实际上经常使用许多标准c ++库,尤其是<algorithm>
and
<numeric>
标头中的内容。教科书似乎推荐它们,但是我从没见过我筛选过的各个项目中的任何一个(巧合?),就个人而言,每次自己编写适当的简单算法似乎比记住或记住更容易。每次查阅这些标头的参考。只是懒惰还是固执?使用这些库时,实际上是否有性能提升等?
谢谢,
[R
c# TaskCompletionSource
对 TaskCompletionSource 完全不清楚,msdn 上有介绍:https://technet.microsoft.com/zh-cn/library/dd449174(v=vs.110).aspx,
https://www.cnblogs.com/farb/p/4870421.html,
https://msdn.microsoft.com/zh-cn/library/dd997423(v=vs.100).aspx,
https://msdn.microsoft.com/zh-cn/library/dd537609(v=vs.100).aspx,
https://msdn.microsoft.com/zh-cn/library/dd997423 (v=vs.100).aspx 中有一句:TaskCompletionSource<TResult> 创建的任何任务将由 TaskCompletionSource 启动,因此,用户代码不应在该任务上调用 Start 方法。
还有一篇 TaskCompletionSource 相关的异步编程好帖子 全面解析 C# 中的异步编程 https://www.cnblogs.com/xiaoyaojian/p/4603238.html 其中有一段代码做实验应该会有收获:
static async void ReadAssignedFile()
{
byte[] buffer;
try
{
double length = await ReadFileAsync("SomeFileDoNotExisted.txt", out buffer);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
static Task<double> ReadFileAsync(string filePath,out byte[] buffer)
{
Stream stream = File.Open(filePath, FileMode.Open);
buffer = new byte[stream.Length];
var tcs = new TaskCompletionSource<double>();
stream.BeginRead(buffer, 0, buffer.Length, arr =>
{
try
{
var length = stream.EndRead(arr);
tcs.SetResult(stream.Length);
}
catch (IOException ex)
{
tcs.SetException(ex);
}
}, null);
return tcs.Task;
}
https://msdn.microsoft.com/zh-cn/magazine/ff959203.aspx
https://msdn.microsoft.com/zh-cn/library/ee622454(v=vs.100).aspx 也有介绍,现在开始做实验:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TaskCompletionSourceDemo
{
class WebDataDownloader
{
static void Main()
{
WebDataDownloader downloader = new WebDataDownloader();
string[] addresses = { "http://www.baidu.com", "http://www.yahoo.com",
"http://www.sina.com", "http://www.qq.com" };
CancellationTokenSource cts = new CancellationTokenSource();
// Create a UI thread from which to cancel the entire operation
Task.Factory.StartNew(() =>
{
Console.WriteLine("Press c to cancel");
if (Console.ReadKey().KeyChar == ''c'')
cts.Cancel();
});
// Using a neutral search term that is sure to get some hits.
Task<string[]> webTask = downloader.GetWordCounts(addresses, "the", cts.Token);
// Do some other work here unless the method has already completed.
if (!webTask.IsCompleted)
{
// Simulate some work.
Thread.SpinWait(5000000);
}
string[] results = null;
try
{
results = webTask.Result;
}
catch (AggregateException e)
{
foreach (var ex in e.InnerExceptions)
{
OperationCanceledException oce = ex as OperationCanceledException;
if (oce != null)
{
if (oce.CancellationToken == cts.Token)
{
Console.WriteLine("Operation canceled by user.");
}
}
else
Console.WriteLine(ex.Message);
}
}
if (results != null)
{
foreach (var item in results)
Console.WriteLine(item);
}
Console.ReadKey();
}
Task<string[]> GetWordCounts(string[] urls, string name, CancellationToken token)
{
TaskCompletionSource<string[]> tcs = new TaskCompletionSource<string[]>();
WebClient[] webClients = new WebClient[urls.Length];
// If the user cancels the CancellationToken, then we can use the
// WebClient''s ability to cancel its own async operations.
token.Register(() =>
{
foreach (var wc in webClients)
{
if (wc != null)
wc.CancelAsync();
}
});
object m_lock = new object();
int count = 0;
List<string> results = new List<string>();
for (int i = 0; i < urls.Length; i++)
{
webClients[i] = new WebClient();
#region callback
// Specify the callback for the DownloadStringCompleted
// event that will be raised by this WebClient instance.
webClients[i].DownloadStringCompleted += (obj, args) =>
{
if (args.Cancelled == true)
{
tcs.TrySetCanceled();
return;
}
else if (args.Error != null)
{
// Pass through to the underlying Task
// any exceptions thrown by the WebClient
// during the asynchronous operation.
tcs.TrySetException(args.Error);
return;
}
else
{
// Split the string into an array of words,
// then count the number of elements that match
// the search term.
string[] words = null;
words = args.Result.Split('' '');
string NAME = name.ToUpper();
int nameCount = (from word in words.AsParallel()
where word.ToUpper().Contains(NAME)
select word)
.Count();
// Associate the results with the url, and add new string to the array that
// the underlying Task object will return in its Result property.
results.Add(String.Format("{0} has {1} instances of {2}", args.UserState, nameCount, name));
}
// If this is the last async operation to complete,
// then set the Result property on the underlying Task.
lock (m_lock)
{
count++;
if (count == urls.Length)
{
tcs.TrySetResult(results.ToArray());
}
}
};
#endregion
// Call DownloadStringAsync for each URL.
Uri address = null;
try
{
address = new Uri(urls[i]);
// Pass the address, and also use it for the userToken
// to identify the page when the delegate is invoked.
webClients[i].DownloadStringAsync(address, address);
}
catch (UriFormatException ex)
{
// Abandon the entire operation if one url is malformed.
// Other actions are possible here.
tcs.TrySetException(ex);
return tcs.Task;
}
}
// Return the underlying Task. The client code
// waits on the Result property, and handles exceptions
// in the try-catch block there.
return tcs.Task;
}
}
}
程序执行的结果如下:
Press c to cancel
http://www.baidu.com/ has 1 instances of the
http://www.qq.com/ has 72 instances of the
http://www.sina.com/ has 2 instances of the
http://www.yahoo.com/ has 0 instances of the
我们今天的关于什么时候应该 TaskCompletionSource使用?和task前面加什么的分享就到这里,谢谢您的阅读,如果想了解更多关于.NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件、005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、C ++标准库-什么时候应该使用它,什么时候不应该使用?、c# TaskCompletionSource的相关信息,可以在本站进行搜索。
本文标签: