本文将介绍ScheduledThreadPoolExecutor的详细情况,特别是关于ScheduledThreadPoolExecutor用法的相关信息。我们将通过案例分析、数据研究等多种方式,帮助
本文将介绍ScheduledThreadPoolExecutor的详细情况,特别是关于ScheduledThreadPoolExecutor用法的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及一些关于(二十)java多线程之ScheduledThreadPoolExecutor、12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析、22. 线程池之 ScheduledThreadPoolExecutor、Executors类的newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool的知识。
本文目录一览:- ScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor用法)
- (二十)java多线程之ScheduledThreadPoolExecutor
- 12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析
- 22. 线程池之 ScheduledThreadPoolExecutor
- Executors类的newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool
ScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor用法)
java提供了方便的定时器功能,代码示例:
public class ScheduledThreadPool_Test {
static class Command implements Runnable {
@Override
public void run() {
System.out.println("zhang");
}
}
public static void main(String[] args) throws IOException {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleWithFixedDelay(new Command(), 1000, 5000, TimeUnit.MILLISECONDS);
System.in.read();
}
}
接下来分析ScheduledThreadPoolExecutor:
// 省略其他代码
public class Executors {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//把任务添加到队列中,创建工作线程
delayedExecute(t);
return t;
}
}
调用scheduleWithFixedDelay方法,把任务添加到DelayedWorkQueue,并启动工作线程。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//把任务添加到队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); // 创建线程
}
}
从队列中取任务的调用栈:
任务在执行的时候,会新建一个任务,放入队列中,这样就实现了定时任务的功能。
从上面能看出:定时的功能主要是由DelayedWorkQueue和ScheduledFutureTask保证的。
DelayedWorkQueue的底层数据结构是由数组实现的堆(堆是一棵完全二叉树,以小顶堆为例,parent节点值小于左右孩子节点的值):
// 省略其他代码
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture[] queue =
new RunnableScheduledFuture[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null;
private final Condition available = lock.newCondition();
private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
private void siftDown(int k, RunnableScheduledFuture key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
ScheduledFutureTask是周期任务:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
//当2个task的时间相同时,用来比较task优先级
private final long sequenceNumber;
//任务执行时间 nanoTime units
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
// DelayedWorkQueue中堆的下标
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 堆在siftUp和siftDown时需要比较大小
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
// 设置周期任务的下一次执行时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下次任务的时间
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
// ScheduledThreadPoolExecutor
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
// ThreadPoolExecutor
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
(二十)java多线程之ScheduledThreadPoolExecutor
本人邮箱: kco1989@qq.com
欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kco
github: https://github.com/kco1989/kco
代码已经全部托管github有需要的同学自行下载
引言
java 提供的线程池还有一个,那就是任务调度线程池ScheduledThreadPoolExecutor
,它其实是ThreadPoolExecutor
的一个子类.
理论
我们通过查看ScheduledThreadPoolExecutor
的源代码,可以发现ScheduledThreadPoolExecutor
的构造器都是调用父类的构造器,只是它使用的工作队列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
通过名字我们都可以猜到这个是一个延时工作队列.
因为ScheduledThreadPoolExecutor
的最大线程是Integer.MAX_VALUE,而且根据源码可以看到execute
和submit
其实都是调用schedule
这个方法,而且延时时间都是指定为0,所以调用execute
和submit
的任务都直接被执行.
例子 搞几个延时炸弹
我们搞几个延时炸弹,让它们每个5s炸一次
public class TestMain {
public static void main(String[] args) throws InterruptedException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(5);
for (int i = 0; i < 5; i ++){
final int temp = i + 1;
pool.schedule(() -> {
System.out.println("第"+temp+"个炸弹爆炸时间:" + simpleDateFormat.format(new Date()));
}, temp * 5, TimeUnit.SECONDS);
}
pool.shutdown();
System.out.println("end main时间:" + simpleDateFormat.format(new Date()));
}
}
运行结果:
end main时间:2016-11-03 19:58:31
第1个炸弹爆炸时间:2016-11-03 19:58:36
第2个炸弹爆炸时间:2016-11-03 19:58:41
第3个炸弹爆炸时间:2016-11-03 19:58:46
第4个炸弹爆炸时间:2016-11-03 19:58:51
第5个炸弹爆炸时间:2016-11-03 19:58:56
ok,这个类相对比较简单,我就不多讲了
后记
在正在项目中,一般如果需要使用定时任务,不会直接使用这个类的.有一个quartz
已经把定时任务封装的很好了.它是通过cron
表示时,可以指定某一个任务每天执行,或者每周三下午5点执行.更多的资料可以去查百度.或者等以后有机会我再整理一写常用jar用法系列文章.就这样了.
打赏
如果觉得我的文章写的还过得去的话,有钱就捧个钱场,没钱给我捧个人场(帮我点赞或推荐一下)
12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析
文章目录
- ScheduledThreadPoolExecutor 介绍
- 简单使用
- 核心内容
- ScheduledFutureTask - 任务
- DelayedWorkQueue - 队列
- 源码分析
- execute()
- schedule()
- scheduleAtFixedRate()
- scheduleWithFixedDelay()
- run()
@L_301_11@ScheduledThreadPoolExecutor 介绍
ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。
简单使用
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
//1. execute
// 和普通线程池执行一样
executor.execute(() -> {
System.out.println("execute");
});
//2. schedule
// 指定延迟时间,一次性执行任务
executor.schedule(() -> {
System.out.println("schedule");
},2000,TimeUnit.MILLISECONDS);
//3. AtFixedrate
// 周期性执行任务(周期时间:执行时间和延迟时间的最大值)
executor.scheduleAtFixedrate(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("at:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
//4. WithFixedDelay
// 周期性执行任务(周期时间:执行时间 + 延迟时间)
executor.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("with:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
}
核心内容
ScheduledFutureTask - 任务
ScheduledFutureTask 实现了 RunnableScheduledFuture 接口,间接的实现了 Delayed 接口,让任务可以放到延迟队列中,并且基于二叉堆做排序
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** 计数器,每个任务都有一个全局唯一的序号
如果任务的执行时间一模一样,比对sequenceNumber */
private final long sequenceNumber;
/** 任务执行的时间,单位是纳秒 */
private long time;
/*
* period == 0:表示一次性执行的任务
* period > 0:表示使用的是 scheduleAtFixedrate
* period < 0:表示使用的是 scheduleWithFixedDelay
* */
private final long period;
/** 周期性执行任务时,引用具体任务,方便后面重新扔到阻塞队列 */
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
// 实现Delayed接口重写的方法,执行时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - Now(), NANOSECONDS);
}
// 实现Delayed接口重写的方法,比较方式
public int compareto(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
DelayedWorkQueue - 队列
DelayedWorkQueue 包装了 RunnableScheduledFuture<?>[]
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 初始长度
private static final int INITIAL_CAPACITY = 16;
// 数组
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 锁
private final reentrantlock lock = new reentrantlock();
// 长度
private int size = 0;
// 等待拿堆顶数据的线程
private Thread leader = null;
// Condition 队列
private final Condition available = lock.newCondition();
}
源码分析
execute()
直接调用 schedule(),入参 delay = 0
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
schedule()
延迟一段时间,执行一次任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
// 非空校验
if (callable == null || unit == null)
throw new NullPointerException();
// 将任务封装成 ScheduledFutureTask
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 延迟执行任务
delayedExecute(t);
return t;
}
// 返回当前任务要执行的系统时间
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return Now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
delayedExecute() 延迟执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池状态不是RUNNING,就拒绝任务
if (isShutdown())
reject(task);
else {
// 将任务放入延迟队列中(二叉堆)
super.getQueue().add(task);
// 1.线程池状态
// 2.根据策略决定是否能执行
// 3.将任务从延迟队列移除
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 是否需要创建线程
ensurePrestart();
}
}
// periodic - true:代表是周期性执行的任务
// periodic - false:代表是一次性的延迟任务
boolean canRunInCurrentRunState(boolean periodic) {
// 默认情况下,如果任务扔到了延迟队列中,有两个策略
// 如果任务是周期性执行的,默认为false(continueExistingPeriodicTasksAfterShutdown)
// 如果任务是一次性的延迟任务,默认为true(executeExistingDelayedTasksAfterShutdown)
// 此时,周期性任务返回false,一次性任务返回true
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
// 重新拿到线程池的ctl
int rs = runStateOf(ctl.get());
// 如果线程池是RUNNING,返回true
// 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
// 是否需要创建线程
void ensurePrestart() {
// 获取线程池中的工作线程个数。
int wc = workerCountOf(ctl.get());
// 如果工作线程个数,小于核心线程数,
if (wc < corePoolSize)
// 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
addWorker(null, true);
// 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
else if (wc == 0)
// 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
addWorker(null, false);
}
scheduleAtFixedrate()
scheduleAtFixedrate 在包装 ScheduledFutureTask 时会将 period 设置为正数,代表固定周期执行
public ScheduledFuture<?> scheduleAtFixedrate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将任务设置给outerTask属性,方便后期重新扔到延迟队列
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t;
}
scheduleWithFixedDelay()
scheduleWithFixedDelay 在包装 ScheduledFutureTask 时会将 period 设置为负数,代表在执行任务完毕后,再计算下次执行的时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将任务设置给outerTask属性,方便后期重新扔到延迟队列
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t;
}
run()
执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。
// 执行任务
public void run() {
// 获取任务是否是周期执行
// true:周期执行
// false:一次的延迟执行
boolean periodic = isPeriodic();
// 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
if (!canRunInCurrentRunState(periodic))
// 取消任务
cancel(false);
else if (!periodic)
// 当前任务是一次性的延迟执行。执行任务具体的run方法
ScheduledFutureTask.super.run();
// 周期性任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次任务运行时间
setNextRunTime();
// 重新将任务扔到延迟队列中
reExecutePeriodic(outerTask);
}
}
// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
// 拿到当前任务的period
long p = period;
// period > 0:At
if (p > 0)
// 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
time = time + p;
else
// period < 0:With
// 任务执行完,拿当前系统时间计算下次执行的时间点
time = Now() + p;
}
// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 线程池状态的判断
if (canRunInCurrentRunState(true)) {
// 将任务扔到了延迟队列中
super.getQueue().add(task);
// 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 是否需要创建线程
ensurePrestart();
}
}
22. 线程池之 ScheduledThreadPoolExecutor

1. ScheduledThreadPoolExecutor 简介
ScheduledThreadPoolExecutor 可以用来在给定延时后执行异步任务或者周期性执行任务,相对于任务调度的 Timer 来说,其功能更加强大,Timer 只能使用一个后台线程执行任务,而 ScheduledThreadPoolExecutor 则可以通过构造函数来指定后台线程的个数。ScheduledThreadPoolExecutor 类的 UML 图如下:
- 从 UML 图可以看出,ScheduledThreadPoolExecutor 继承了
ThreadPoolExecutor
,也就是说 ScheduledThreadPoolExecutor 拥有 execute () 和 submit () 提交异步任务的基础功能,关于 ThreadPoolExecutor 可以看这篇文章。但是,ScheduledThreadPoolExecutor 类实现了ScheduledExecutorService
,该接口定义了 ScheduledThreadPoolExecutor 能够延时执行任务和周期执行任务的功能; - ScheduledThreadPoolExecutor 也两个重要的内部类:DelayedWorkQueue 和 ScheduledFutureTask。可以看出 DelayedWorkQueue 实现了 BlockingQueue 接口,也就是一个阻塞队列,ScheduledFutureTask 则是继承了 FutureTask 类,也表示该类用于返回异步任务的结果。这两个关键类,下面会具体详细来看。
1.1 构造方法
ScheduledThreadPoolExecutor 有如下几个构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
};
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
可以看出由于 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,它的构造方法实际上是调用了 ThreadPoolExecutor,对 ThreadPoolExecutor 的介绍可以可以看这篇文章,理解 ThreadPoolExecutor 构造方法的几个参数的意义后,理解这就很容易了。可以看出,ScheduledThreadPoolExecutor 的核心线程池的线程个数为指定的 corePoolSize,当核心线程池的线程个数达到 corePoolSize 后,就会将任务提交给有界阻塞队列 DelayedWorkQueue,对 DelayedWorkQueue 在下面进行详细介绍,线程池允许最大的线程个数为 Integer.MAX_VALUE,也就是说理论上这是一个大小无界的线程池。
1.2 特有方法
ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService
接口,该接口定义了可延时执行异步任务和可周期执行异步任务的特有功能,相应的方法分别为:
//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
//因此通过ScheduledFuture.get()获取结果为null
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
//因此,返回的是任务的最终计算结果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//是以上一个任务开始的时间计时,period时间过去后,
//检测上一个任务是否执行完毕,如果上一个任务执行完毕,
//则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
2. 可周期性执行的任务 ---ScheduledFutureTask
ScheduledThreadPoolExecutor 最大的特色是能够周期性执行异步任务,当调用 schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法
时,实际上是将提交的任务转换成的 ScheduledFutureTask 类,从源码就可以看出。以 schedule 方法为例:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
可以看出,通过 decorateTask
会将传入的 Runnable 转换成 ScheduledFutureTask
类。线程池最大作用是将任务和线程进行解耦,线程主要是任务的执行者,而任务也就是现在所说的 ScheduledFutureTask。紧接着,会想到任何线程执行任务,总会调用 run()
方法。为了保证 ScheduledThreadPoolExecutor 能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask 重写了 run 方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//如果不是周期性执行任务,则直接调用run方法
ScheduledFutureTask.super.run();
//如果是周期性执行任务的话,需要重设下一次执行任务的时间
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
从源码可以很明显的看出,在重写的 run 方法中会先 if (!periodic)
判断当前任务是否是周期性任务,如果不是的话就直接调用 run()方法
;否则的话执行 setNextRunTime()
方法重设下一次任务执行的时间,并通过 reExecutePeriodic(outerTask)
方法将下一次待执行的任务放置到 DelayedWorkQueue
中。
因此,可以得出结论:ScheduledFutureTask
最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用 schedule 方法)则直接通过 run()
执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。
3. DelayedWorkQueue
在 ScheduledThreadPoolExecutor 中还有另外的一个重要的类就是 DelayedWorkQueue。为了实现其 ScheduledThreadPoolExecutor 能够延时执行异步任务以及能够周期执行任务,DelayedWorkQueue 进行相应的封装。DelayedWorkQueue 是一个基于堆的数据结构,类似于 DelayQueue 和 PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以 DelayedWorkQueue 的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
为什么要使用 DelayedWorkQueue 呢?
定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
DelayedWorkQueue 是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O (logN)。
DelayedWorkQueue 的数据结构
//初始大小
private static final int INITIAL_CAPACITY = 16;
//DelayedWorkQueue是由一个大小为16的数组组成,数组元素为实现RunnableScheduleFuture接口的类
//实际上为ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
可以看出 DelayedWorkQueue 底层是采用数组构成的,关于 DelayedWorkQueue 可以看这篇博主的文章,很详细。
关于 DelayedWorkQueue 我们可以得出这样的结论:DelayedWorkQueue 是基于堆的数据结构,按照时间顺序将每个任务进行排序,将待执行时间越近的任务放在在队列的队头位置,以便于最先进行执行。
4.ScheduledThreadPoolExecutor 执行过程
现在我们对 ScheduledThreadPoolExecutor 的两个内部类 ScheduledFutueTask 和 DelayedWorkQueue 进行了了解,实际上这也是线程池工作流程中最重要的两个关键因素:任务以及阻塞队列。现在我们来看下 ScheduledThreadPoolExecutor 提交一个任务后,整体的执行过程。以 ScheduledThreadPoolExecutor 的 schedule 方法为例,具体源码为:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//将提交的任务转换成ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//延时执行任务ScheduledFutureTask
delayedExecute(t);
return t;
}
方法很容易理解,为了满足 ScheduledThreadPoolExecutor 能够延时执行任务和能周期执行任务的特性,会先将实现 Runnable 接口的类转换成 ScheduledFutureTask。然后会调用 delayedExecute
方法进行执行任务,这个方法也是关键方法,来看下源码:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//如果当前线程池已经关闭,则拒绝任务
reject(task);
else {
//将任务放入阻塞队列中
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//保证至少有一个线程启动,即使corePoolSize=0
ensurePrestart();
}
}
delayedExecute
方法的主要逻辑请看注释,可以看出该方法的重要逻辑会是在 ensurePrestart()
方法中,它的源码为:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
可以看出该方法逻辑很简单,关键在于它所调用的 addWorker方法
,该方法主要功能:新建 Worker类
,当执行任务时,就会调用被 Worker所重写的run方法
,进而会继续执行 runWorker
方法。在 runWorker
方法中会调用 getTask
方法从阻塞队列中不断的去获取任务进行执行,直到从阻塞队列中获取的任务为 null 的话,线程结束终止。addWorker 方法是 ThreadPoolExecutor 类中的方法,对 ThreadPoolExecutor 的源码分析可以看这篇文章,很详细。
5. 总结
-
ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 类,因此,整体上功能一致,线程池主要负责创建线程(Worker 类),线程从阻塞队列中不断获取新的异步任务,直到阻塞队列中已经没有了异步任务为止。但是相较于 ThreadPoolExecutor 来说,ScheduledThreadPoolExecutor 具有延时执行任务和可周期性执行任务的特性,ScheduledThreadPoolExecutor 重新设计了任务类
ScheduleFutureTask
,ScheduleFutureTask 重写了run
方法使其具有可延时执行和可周期性执行任务的特性。另外,阻塞队列DelayedWorkQueue
是可根据优先级排序的队列,采用了堆的底层数据结构,使得与当前时间相比,待执行时间越靠近的任务放置队头,以便线程能够获取到任务进行执行; -
线程池无论是 ThreadPoolExecutor 还是 ScheduledThreadPoolExecutor,在设计时的三个关键要素是:任务,执行者以及任务结果。它们的设计思想也是完全将这三个关键要素进行了解耦。
执行者
任务的执行机制,完全交由
Worker类
,也就是进一步了封装了 Thread。向线程池提交任务,无论为 ThreadPoolExecutor 的 execute 方法和 submit 方法,还是 ScheduledThreadPoolExecutor 的 schedule 方法,都是先将任务移入到阻塞队列中,然后通过 addWork 方法新建了 Work 类,并通过 runWorker 方法启动线程,并不断的从阻塞对列中获取异步任务执行交给 Worker 执行,直至阻塞队列中无法取到任务为止。任务
在 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 中任务是指实现了 Runnable 接口和 Callable 接口的实现类。ThreadPoolExecutor 中会将任务转换成
FutureTask
类,而在 ScheduledThreadPoolExecutor 中为了实现可延时执行任务和周期性执行任务的特性,任务会被转换成ScheduledFutureTask
类,该类继承了 FutureTask,并重写了 run 方法。任务结果
在 ThreadPoolExecutor 中提交任务后,获取任务结果可以通过 Future 接口的类,在 ThreadPoolExecutor 中实际上为 FutureTask 类,而在 ScheduledThreadPoolExecutor 中则是
ScheduledFutureTask
类
Executors类的newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool
Executors 类对 ThreadPoolExecutor 的构造函数进行了封装,使用该类可方便地创建线程池。
1. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 对应的ThreadPoolExecutor设置如下:
this.corePoolSize = nThreads;
this.maximumPoolSize = nThreads;
this.workQueue = new LinkedBlockingQueue<Runnable>();
this.keepAliveTime = TimeUnit.MILLISECONDS.toNanos(0L);
this.threadFactory = Executors.defaultThreadFactory();
this.handler = defaultHandler;
2. newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 对应的ThreadPoolExecutor设置如下:
this.corePoolSize = 0;
this.maximumPoolSize = Integer.MAX_VALUE;
this.workQueue = new SynchronousQueue<Runnable>();
this.keepAliveTime = TimeUnit.SECONDS.toNanos(60L); //60秒
this.threadFactory = Executors.defaultThreadFactory();
this.handler = defaultHandler;
分析任务入队和出队,分别对应 ThreadPoolExecutor 类的 execute 方法和 getTask 方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// corePoolSize=0,所以不会走这个分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 把任务放进队列。
// newCachedThreadPool使用的队列是SynchronousQueue,当没有线程因take阻塞时,offer返回false。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果工作线程数为0,创建工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果入队失败,则创建工作线程
else if (!addWorker(command, false))
reject(command);
}
cachedThreadPool 的一个特点是:工作线程执行完任务后,继续从工作队列获取任务(poll),等待60秒,超时则返回 null。task为 null时,工作线程就退出了 while 循环,也就是说这个线程要死了。
while (task != null || (task = getTask()) != null) {...}
3. newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//对应ThreadPoolExecutor的设置
this.corePoolSize = corePoolSize;
this.maximumPoolSize = Integer.MAX_VALUE;
this.workQueue = new DelayedWorkQueue();
this.keepAliveTime = TimeUnit.NANOSECONDS.toNanos(0);
this.threadFactory = Executors.defaultThreadFactory();
this.handler = defaultHandler;
分析定时器延迟队列的 take 方法:DelayedWorkQueue 的底层是堆,访问堆顶的任务,如果任务的时间到了,则返回,否则等待直到时间到来。
// ScheduledThreadPoolExecutor.DelayedWorkQueue
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
可以看出,3种线程池的主要区别是使用的队列不同。
4. DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); //设置为前台线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
如果 Runnable 任务抛出了异常,线程池的工作线程还在吗?在的,线程会 terminate 掉,然后添加一个新的 Worker
关于ScheduledThreadPoolExecutor和ScheduledThreadPoolExecutor用法的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于(二十)java多线程之ScheduledThreadPoolExecutor、12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析、22. 线程池之 ScheduledThreadPoolExecutor、Executors类的newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool等相关内容,可以在本站寻找。
本文标签: