本文将分享futureTask的超时原理解析的详细内容,并且还将对futuretaskcallable进行详尽解释,此外,我们还将为大家带来关于Callable、Future和FutureTask、C
本文将分享futureTask的超时原理解析的详细内容,并且还将对futuretask callable进行详尽解释,此外,我们还将为大家带来关于Callable、Future和FutureTask、Callable、Future和FutureTask浅析、Callable,Future 和 FutureTask 详解、Executor 框架(七)Future 接口、FutureTask 类的相关知识,希望对你有所帮助。
本文目录一览:- futureTask的超时原理解析(futuretask callable)
- Callable、Future和FutureTask
- Callable、Future和FutureTask浅析
- Callable,Future 和 FutureTask 详解
- Executor 框架(七)Future 接口、FutureTask 类
futureTask的超时原理解析(futuretask callable)
序
本文主要解析一下futureTask的超时原理。
实例
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<?> future = executor.submit(
new Callable<Void>() {
public Void call() throws Exception {
//do something
return null;
}
});
future.get(500, TimeUnit.MILLISECONDS);
里头构造的是java/util/concurrent/ThreadPoolExecutor.java
submit
java/util/concurrent/AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
execute
java/util/concurrent/ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn''t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这里只是放入workQueue,然后判断是否需要添加线程
runWorker
java/util/concurrent/ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里循环从workQueue取出task,然后调用task.run()
futureTask.run
java/util/concurrent/FutureTask.java
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这里如果执行完成的话,会调用set(result),而异常的话,会调用setException(ex)
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
都把状态从NEW设置为COMPLETING
future.get(long)
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
这里看awaitDone,等待指定的时候,发现状态不是COMPLETING,则抛出TimeoutException,让调用线程返回。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
这里等待超时,然后返回原始状态
小结
由此可见,超时的机制其实不能中断callable里头实际执行的动作,超时只是让调用线程能够在指定时间返回而已,而底层调用的方法,实际还在执行。这里是需要额外注意的。
Callable、Future和FutureTask
一、Callable 与 Runnable
先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:
public interface Runnable { public abstract void run(); }
由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
可以看到,这是一个泛型接口,该接口声明了一个名称为call()的方法,同时这个方法可以有返回值V,也可以抛出异常。call()方法返回的类型就是传递进来的V类型。
那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
第一个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
第三个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。
因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。
二、Future
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future<V>接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
三、FutureTask
我们先来看一下FutureTask的实现:
public class FutureTask<V> implements RunnableFuture<V>
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
分析:
FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据FutureTask.run()的执行时机来分析其所处的3种状态:
(1)未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。
(2)已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。
(3)已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。
下面我们再来看看FutureTask的方法执行示意图(方法和Future接口基本是一样的,这里就不过多描述了)
分析:
(1)当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
(2)当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(...)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(...)返回false。当任务已经完成,执行cancel(...)方法将返回false。
最后我们给出FutureTask的两种构造函数:
public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { }
事实上,FutureTask是Future接口的一个唯一实现类。
四、使用示例
通过上面的介绍,我们对Callable,Future,FutureTask都有了比较清晰的了解了,那么它们到底有什么用呢?我们前面说过通过这样的方式去创建线程的话,最大的好处就是能够返回结果,加入有这样的场景,我们现在需要计算一个数据,而这个数据的计算比较耗时,而我们后面的程序也要用到这个数据结果,那么这个时Callable岂不是最好的选择?我们可以开设一个线程去执行计算,而主线程继续做其他事,而后面需要使用到这个数据时,我们再使用Future获取不就可以了吗?下面我们就来编写一个这样的实例。
1、使用Callable+Future获取执行结果
package com.demo.test; import java.util.concurrent.Callable; public class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
package com.demo.test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableTest { public static void main(String[] args) { //创建线程池 ExecutorService executor = Executors.newCachedThreadPool(); //创建Callable对象任务 Task task = new Task(); //提交任务并获取执行结果 Future<Integer> result = executor.submit(task); //关闭线程池 executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { if(result.get()!=null){ System.out.println("task运行结果"+result.get()); }else{ System.out.println("未获取到结果"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } }
运行结果:
子线程在进行计算 主线程在执行任务 task运行结果4950 所有任务执行完毕
2、使用Callable+FutureTask获取执行结果
package com.demo.test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; public class CallableTest1 { public static void main(String[] args) { //第一种方式 ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); executor.shutdown(); //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread // Task task = new Task(); // FutureTask<Integer> futureTask = new FutureTask<Integer>(task); // Thread thread = new Thread(futureTask); // thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { if(futureTask.get()!=null){ System.out.println("task运行结果"+futureTask.get()); }else{ System.out.println("future.get()未获取到结果"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } }
运行结果同上。
补充:
实现Runnable接口和实现Callable接口的区别:
1、Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的。
2、Callable规定的方法是call(),Runnable规定的方法是run()。
3、Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。
4、call方法可以抛出异常,run方法不可以。
5、运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。
6、加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。
Callable、Future和FutureTask浅析
1.Callable<V>接口
我们先回顾一下java.lang.Runnable接口,就声明了run(),其返回值为void,当然就无法获取结果了。
public interface Runnable {
public abstract void run();
}
而Callable的接口定义如下
public interface Callable<V> {
V call() throws Exception;
}
该接口声明了一个名称为call()的方法,同时这个方法可以有返回值V,也可以抛出异常。嗯,对该接口我们先了解这么多就行,下面我们来说明如何使用,前篇文章我们说过,无论是Runnable接口的实现类还是Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行,ThreadPoolExecutor或ScheduledThreadPoolExecutor都实现了ExcutorService接口,而因此Callable需要和Executor框架中的ExcutorService结合使用,我们先看看ExecutorService提供的方法:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
第一个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
第三个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。
因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。还有点要注意的是,除了我们自己实现Callable对象外,我们还可以使用工厂类Executors来把一个Runnable对象包装成Callable对象。Executors工厂类提供的方法如下:
public static Callable<Object> callable(Runnable task)
public static <T> Callable<T> callable(Runnable task, T result)
2.Future<V>接口
Future<V>接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
方法解析:
V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。
boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。
boolean isCanceller() :如果任务完成前被取消,则返回true。
boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(...)方法将返回false;如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成,执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。
通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。
但是我们必须明白Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。
3.FutureTask类
我们先来看看FutureTask的实现
public class FutureTask<V> implements RunnableFuture<V> {
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
分析:FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据FutureTask.run()的执行时机来分析其所处的3种状态:
(1)未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。
(2)已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。
(3)已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。
下面我们再来看看FutureTask的方法执行示意图(方法和Future接口基本是一样的,这里就不过多描述了)
分析:
(1)当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
(2)当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。
当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(...)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(...)返回false。
当任务已经完成,执行cancel(...)方法将返回false。
最后我们给出FutureTask的两种构造函数:
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
3.Callable<V>/Future<V>/FutureTask的使用
通过上面的介绍,我们对Callable,Future,FutureTask都有了比较清晰的了解了,那么它们到底有什么用呢?我们前面说过通过这样的方式去创建线程的话,最大的好处就是能够返回结果,加入有这样的场景,我们现在需要计算一个数据,而这个数据的计算比较耗时,而我们后面的程序也要用到这个数据结果,那么这个时Callable岂不是最好的选择?我们可以开设一个线程去执行计算,而主线程继续做其他事,而后面需要使用到这个数据时,我们再使用Future获取不就可以了吗?下面我们就来编写一个这样的实例
3.1 使用Callable+Future获取执行结果
Callable实现类如下:
package com.zejian.Executor;
import java.util.concurrent.Callable;
/**
* @author zejian
* @time 2016年3月15日 下午2:02:42
* @decrition Callable接口实例
*/
public class CallableDemo implements Callable<Integer> {
private int sum;
@Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算啦!");
Thread.sleep(2000);
for(int i=0 ;i<5000;i++){
sum=sum+i;
}
System.out.println("Callable子线程计算结束!");
return sum;
}
}
Callable执行测试类如下:
package com.zejian.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author zejian
* @time 2016年3月15日 下午2:05:43
* @decrition callable执行测试类
*/
public class CallableTest {
public static void main(String[] args) {
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor();
//创建Callable对象任务
CallableDemo calTask=new CallableDemo();
//提交任务并获取执行结果
Future<Integer> future =es.submit(calTask);
//关闭线程池
es.shutdown();
try {
Thread.sleep(2000);
System.out.println("主线程在执行其他任务");
if(future.get()!=null){
//输出获取到的结果
System.out.println("future.get()-->"+future.get());
}else{
//输出获取到的结果
System.out.println("future.get()未获取到结果");
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主线程在执行完成");
}
}
执行结果:
Callable子线程开始计算啦!
主线程在执行其他任务
Callable子线程计算结束!
future.get()-->12497500
主线程在执行完成
3.2 使用Callable+FutureTask获取执行结果
package com.zejian.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
* @author zejian
* @time 2016年3月15日 下午2:05:43
* @decrition callable执行测试类
*/
public class CallableTest {
public static void main(String[] args) {
// //创建线程池
// ExecutorService es = Executors.newSingleThreadExecutor();
// //创建Callable对象任务
// CallableDemo calTask=new CallableDemo();
// //提交任务并获取执行结果
// Future<Integer> future =es.submit(calTask);
// //关闭线程池
// es.shutdown();
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor();
//创建Callable对象任务
CallableDemo calTask=new CallableDemo();
//创建FutureTask
FutureTask<Integer> futureTask=new FutureTask<>(calTask);
//执行任务
es.submit(futureTask);
//关闭线程池
es.shutdown();
try {
Thread.sleep(2000);
System.out.println("主线程在执行其他任务");
if(futureTask.get()!=null){
//输出获取到的结果
System.out.println("futureTask.get()-->"+futureTask.get());
}else{
//输出获取到的结果
System.out.println("futureTask.get()未获取到结果");
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主线程在执行完成");
}
}
执行结果:
Callable子线程开始计算啦!
主线程在执行其他任务
Callable子线程计算结束!
futureTask.get()-->12497500
主线程在执行完成
Callable,Future 和 FutureTask 详解
1.Callable 和 Runnable
看 Callable 接口:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
看 Runnable 接口:
public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object''s
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
Callable 和 Runnable 都代表着任务,不同之处在于 Callable 有返回值,并且能抛出异常,Runnable 任务执行结束之后没有返回值。Callable 一般和 Future 一起使用,可以获取任务返回结果。
2.Future 接口
Future 就是对具体的 Callable 和 Runnable 任务进行操作,如:取消任务,查询任务是否完成,获取任务完成之后的返回结果(如果有返回值)。看代码:
public interface Future<V> {
// 用于取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否被取消成功,如果取消成功,返回true
boolean isCancelled();
// 任务是否已经完成,如果完成,返回true
boolean isDone();
// 获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
// 获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
由接口可知,Future 提供了四种功能:
- 取消任务
- 判断任务是否取消成功
- 判断任务是否完成
- 获取任务执行结果
通过一个 Demo 来理解 Future 的用法:
public class FutureTest {
public static class Task implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("开始执行任务。。。");
return "callable";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
List<Future<String>> results = new ArrayList<Future<String>>();
for(int i = 0;i<100;i++){
results.add(es.submit(new Task()));
}
for(Future<String> res : results){
System.out.println(res.get());
}
}
}
3.FutureTask
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
从代码可知:FutureTask 既可以作为 Runnable 被线程执行,又具有 Future 的功能。
FutureTask 的两种使用方式:
使用方式一:FutureTask+Thread
public class FutureTest {
public static class Task implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("开始执行任务。。。");
return "callable";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建FutureTask对象,并传入一个实现了Callable接口的对象作为构造参数
FutureTask<String> futureTask = new FutureTask<String>(new Task());
//创建Thread并启动
Thread thread = new Thread(futureTask);
thread.start();
}
}
使用方式二:FutureTask+ExecutorService
public class FutureTest {
public static class Task implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("开始执行任务。。。");
return "callable";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> futureTask = new FutureTask<String>(new Task());
es.submit(futureTask);
}
}
Executor 框架(七)Future 接口、FutureTask 类
Future 接口介绍
Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。 Future 一般由 ExecutorService 的 submit()、invokeAll()方法返回的,用于跟踪、获取任务在线程池中的运行情况、等待运算结果,还可以取消任务。(还有其子接口 ScheduleFuture 则由 ScheduleExecutorService 的 schedule()等方法返回);
方法描述
boolean cancel(boolean mayInterruptIfRunning):
试图取消对此任务的执行。分成以下三种情况:
- 如果任务尚未启动,则此任务将永不运行。
- 如果任务已经启动,则 mayInterruptIfRunning 参数确定是否 中断这个任务来尝试停止任务。同时,任务也应该要对中断敏感。
- 任务已完成、或已取消,或者由于某些其他原因而无法取消,返回 false。
注意: 此方法返回后,对 isDone () 的后续调用将始终返回 true。但如果此方法返回 true,则对 isCancelled () 的后续调用才将始终返回
boolean isCancelled(): 如果在任务正常完成前将其取消,则返回 true。
boolean isDone(): 如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
获取计算结果 获取计算结果的方法,JDK 提供了两个方法:阻塞获取 与 超时等待获取。这两个方法会抛出 CancellationException(任务被取消时)、InterruptedException V get(): 等待计算完成,然后获取其结果。 V get(long timeout,TimeUnit unit): 最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
@ Example <font color="blue"> 示例 </font>
下面的例子,是在单线程的线程池中提交两个任务(任务 A、任务 B)。
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//单线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
//提交两个任务
Future futureA = executor.submit(new MyCallable("futureA"));
Future futureB = executor.submit(new MyCallable("futureB"));
Thread.sleep(1000);
//在运行一秒后,判断任务A是否完成
if(futureA.isDone()){
//如果完成,则直接获取结果
double result = (double) futureA.get();
System.out.println("运算结果是:"+result);
}else{
//如果没有完成,则取消任务A
boolean b = futureA.cancel(false);
System.out.println("futureA 执行了cancel方法,返回的值是:"+b);
}
//取消任务B
futureB.cancel(false);
}
}
class MyCallable implements Callable{
String taskName;
public MyCallable(String taskName){
this.taskName = taskName;
}
@Override
public Object call() {
try {
//模拟任务的执行时间为 2s
for(int i=0;i<5;i++){
Thread.sleep(400);
System.out.println(taskName+"任务正在运行中.....");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
double d = Math.random() * 10;
return d;
}
}
运行结果:
futureA 任务正在运行中..... futureA 任务正在运行中..... futureA 执行了 cancel 方法,返回的值是:true futureA 任务正在运行中..... futureA 任务正在运行中..... futureA 任务正在运行中.....
任务 A 是在执行时被取消的,调用的 cancel(false)
方法返回的结果为 true,但是任务并没有真的停止执行。任务 B 则是在还没被执行时取消的,所以任务 B 在后续的时间内,没有执行。 可以得出结论,<font color=“blue”>cancel( false)
方法是取消尚未被执行的任务、周期任务,而不是停止正在执行的任务。当然,如果想要停止正在执行的任务,任务里面必须是中断敏感,然后 cancel(true)
,参数为 true,即在 cancel 的同时,也发出中断信号。</font>
//简单的中断处理,发现中断退出
public void run(){
while(!Thread.interrupted()){
//.....
}
}
<br/>
FutureTask 介绍
FutureTask 是一个可取消的异步计算任务,是一个独立的类,实现了 Future、Runnable 接口。<font color="blue">FutureTask 的出现是为了弥补 Thread 的不足而设计的,可以让程序员跟踪、获取任务的执行情况、计算结果 </font> 。 因为 FutureTask 实现了 Runnable,所以 FutureTaskk 可以作为参数来创建一个新的线程来执行,也可以提交给 Executor 执行。FutureTask 一旦计算完成,就不能再重新开始或取消计算。
构造方法
FutureTask(Callable<V> callable) 创建一个 FutureTask,一旦运行就执行给定的 Callable。 FutureTask(Runnable runnable, V result) 创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 。
应用场景
FutureTask 可用于异步获取执行结果或可以取消执行任务的场景;
@ Example <font color="blue"> 简单例子 </font>
下面的例子中,因为计算数据的时间比较长,所以 main 线程就额外起一个异步线程来计算数据,从而使得计算数据的同时,main 线程可以做其他工作,直到需要用到计算结果时,才去获取计算结果。 <font color="blue"> 需要注意的是,线程 thread2 并没有执行 FutureTask, 因为 FutureTask 已经在线程 thread 中完成了。一旦 FutureTask 计算完成,就不能再重新开始或取消计算。</font>
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<Double> task = new FutureTask(new MyCallable());
//创建一个线程,异步计算结果
Thread thread = new Thread(task);
thread.start();
//主线程继续工作
Thread.sleep(1000);
System.out.println("主线程等待计算结果...");
//当需要用到异步计算的结果时,阻塞获取这个结果
Double d = task.get();
System.out.println("计算结果是:"+d);
//用同一个 FutureTask 再起一个线程
Thread thread2 = new Thread(task);
thread2.start();
}
}
class MyCallable implements Callable<Double>{
@Override
public Double call() {
double d = 0;
try {
System.out.println("异步计算开始.......");
d = Math.random()*10;
d += 1000;
Thread.sleep(2000);
System.out.println("异步计算结束.......");
} catch (InterruptedException e) {
e.printStackTrace();
}
return d;
}
}
运行结果:
异步计算开始....... 主线程等待计算结果... 异步计算结束....... 计算结果是:1002.7806590582911
<br/>
除了实现 Future、Runnable 外,此类还提供了几个 protected 方法,用于扩展此类
protected void done() 当此任务转换到状态 isDone(不管是正常地还是通过取消)时,调用受保护的方法。默认实现不执行任何操作。 protected void set(V v) 除非已经设置了此 Future 或已将其取消,否则将其结果设置为给定的值。在计算成功完成时通过 run 方法内部调用此方法。 protected void setException(Throwable t) 除非已经设置了此 Future 或已将其取消,否则它将报告一个 ExecutionException,并将给定的 throwable 作为其原因。在计算失败时通过 run 方法内部调用此方法。 protected boolean runAndReset() 执行计算而不设置其结果,然后将此 Future 重置为初始状态,如果计算遇到异常或已取消,则该操作失败。本操作被设计用于那些本质上要执行多次的任务。
关于futureTask的超时原理解析和futuretask callable的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于Callable、Future和FutureTask、Callable、Future和FutureTask浅析、Callable,Future 和 FutureTask 详解、Executor 框架(七)Future 接口、FutureTask 类的相关知识,请在本站寻找。
本文标签: