最近很多小伙伴都在问JAVA线程池之Executors和二原理分析这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Executor框架(二)Executor与ExecutorSer
最近很多小伙伴都在问JAVA线程池 之 Executors 和二 原理分析这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Executor 框架(二)Executor 与 ExecutorService 两个基本接口、ExecutorService java线程池主线程等待子线程执行完成、Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式、Java executors 创建线程池和使用 ThreadPoolExecutor等相关知识,下面开始了哦!
本文目录一览:- JAVA线程池 之 Executors (二) 原理分析(java 线程池execute)
- Executor 框架(二)Executor 与 ExecutorService 两个基本接口
- ExecutorService java线程池主线程等待子线程执行完成
- Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式
- Java executors 创建线程池和使用 ThreadPoolExecutor
JAVA线程池 之 Executors (二) 原理分析(java 线程池execute)
一、线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING : 该状态的线程池会接收新的任务,并处理阻塞队列中的任务。
SHUTDOWN : 该状态的线程池不会接收新的任务,但会处理阻塞队列中的任务。
STOP : 该状态的线程池不会接收新的任务,也不会处理阻塞队列中的任务,而且会中断正在执行的任务。
二、任务提交 方式
1、execute
提交的任务必须实现Runnable接口,接口不带返回值
public void execute(Runnable command) {
2、submit
父类AbstractExecutorService提供有submit接口,可获取线程执行返回值。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
三、任务执行 -- execute
execute 方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
大致流程为:
1、通过workerCountOf方法得到线程池的当前线程数,如果当前线程数小于corePoolSize,则执行addWorker方法创建一个新的核心线程执行任务。
2、如果当前线程数大于等于corePoolSize时,检查线程池的运行状态,如果线程池运行状态为RUNNING,则尝试将任务加入阻塞队列。
3、再次检查线程池的运行状态,如果运行状态不为RUNNING,则从阻塞队列中删除任务并执行reject方法调用处理机制。
4、在2的基础上,如果加入阻塞队列失败,则会执行addWorker方法创建一个新的非核心线程执行任务。
5、在3的基础上,如果addWorker执行失败,则会调用reject调用处理机制。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
大致流程为:
1、自旋检测线程池状态,如果状态大于SHUTDOWN,或者 firstTask为空 或队列为空 时,返回任务加入队列失败。
2、获取线程池当前线程数,通过core判断是否是创建核心线程,如果为true,并且当前线程数wc小于corePoolSize时,跳出循环创建新的线程。如果core为false,
则判断当前线程数wc是否小于maximumPoolSize,小于跳出循环。
3、线程池的工作线程时候通过Worker实现的,通过ReentrantLock加锁,再次通过线程池状态监测之后,将worker加入到HashSet<Worker> workers 里面
4、如果加入成功,则启动Worker中的线程。
Worker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker类继承了AbstractQueuedSynchronizer(AQS)类,可以方便的实现工作线程的中止操作。
并且本身实现了Runnable接口,可单独作为任务在工作线程中执行。
runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker流程:
1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2、获取第一个任务firstTask,并执行task的run方法,在执行run方法前,会对Worker加锁,任务执行完释放锁。
3、在任务执行前后,可根据业务自定义实现beforeExecute(wt, task); 和 afterExecute(task, thrown);。
4、任务执行完之后,调用getTask从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用CPU资源。
getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask流程:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
1、如果设定了超时机制,则通过 workQueue.poll()方法来获取阻塞队列中的任务,如果队列中没有任务,则会在keepAliveTime时间后返回null。
2、如果未设置超时机制,并且当前线程数小于核心线程时,同时未设置允许核心线程超时的情况下,通过workQueue.take(); 方法来获取阻塞队列中的任务,如果没有任务,
则会一直等待并挂起,直到有新任务提交时,则会环信等待的队列并返回新的任务。
3、阻塞队列使用生产者与消费者模式,使用等待与唤醒使线程池线程挂起与唤起。
四、任务执行 -- submit
submit重载了多种实现方式
1、Callable
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
2、Runnable
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
在实际业务中,Future和Callable是成双出现的,Callable负责产生结果,Future负责获取结果。
1、Callable类似于Runnable,只是Callable附带返回值。
2、Callable除了正常返回之外,如果线程出现异常,该异常也会返回,即Future的get方法可以获取到异常结果。
3、Future的get()方法会导致主线程阻塞,直到Callable执行完成。
FutureTask
futureTask内部状态
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
FutureTask 实现了Runnable接口,提交的任务可以交由工作线程处理,执行run方法。
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
调用get方法时,如果task的状态处于执行中或初始化,调用awaitDone方法对线程进行阻塞。
awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
通过对Task的状态检测,如果Callable未执行完成,使用 LockSupport.park(this); 对当前线程进行阻塞。等待唤起,并将主线程封装成WaitNode 并存放在 waiters 链表中。
run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
run方法流程:
通过对task的state判断,如果task为初始New状态,则执行call方法,获取call方法返回结果,并调用set方法
如果执行失败,则调用setException方法。
setException方法
设置状态 EXCEPTIONAL
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
set方法
设置状态 NORMAL
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
finishCompletion();方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
如果finishCompletion 检测到 通过get方法被阻塞的线程集 waiters 不为空时,获取的每一个节点,并使用 LockSupport.unpark(t); 对其唤醒。
最终使用report返回结果。
Executor 框架(二)Executor 与 ExecutorService 两个基本接口
一、Executor 接口简介
Executor 接口是 Executor 框架的一个最基本的接口,Executor 框架的大部分类都直接或间接地实现了此接口。
只有一个方法
void execute(Runnable command): 在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。
Executor 的几种实现原理介绍:
1、 <font color="blue">Executor 接口并没有严格地要求执行是异步的。</font > 在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
2、 更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
3、 许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
private final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
二、ExecutorService 接口简介
ExecutorService
是一个接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 ExecutorService 的实现:
- 三个实现类:
AbstractExecutorService
(默认实现类) ,ScheduledThreadPoolExecutor
,ThreadPoolExecutor
Executors
提供了此接口的几种常用实现的工厂方法。
方法摘要
1. 从 Executor 接口中继承了不跟踪异步线程,没有返回的 execute 方法:
void execute(Runnable command):
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。
2. 扩展的跟踪异步线程、返回 Future 接口的实现类的方法:
Future<?> submit(Runnable task):
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。
**<T> Future<T> submit(Runnable task,T result):
** 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
<T> Future<T> submit(Callable<T> task):
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。<font color="blue"> 该 Future 的 get 方法在成功完成时将会返回该任务的结果。如果想立即 < font color="red"> 阻塞任务的等待 </font>,则可以使用 result = exec.submit (aCallable).get (); 形式的构造 </font>
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException:
执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone () 为 true。注意,可以正常地或通过抛出异常来终止已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException:
超时等待,同上。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException:
与 invokeAll 的区别是,<font color="red"> 任务列表里只要有一个任务完成了,就立即返回。而且一旦正常或异常返回后,则取消尚未完成的任务。</font>
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException:
超时等待,同上。
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException:
一直等待,直到所有任务完成。请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行,或者超时时间的到来 <font color="red"> 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false </font>
3. 管理生命周期
void shutdown():
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 List<Runnable> shutdownNow():
试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,<font color="blue"> 在 ThreadPoolExecutor 中,通过 Thread.interrupt () 来取消典型的实现,所以如果任务无法响应中断,则永远无法终止。</font> boolean isShutdown():
如果此执行程序已关闭,则返回 true。 boolean isTerminated():
如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
用法示例
下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool (int) 工厂方法:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
<br/>
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
参考文献
- 《java 并发编程的艺术》
ExecutorService java线程池主线程等待子线程执行完成
java1.5及1.6中通过ExecutorService提供了线程池的支持。我们可以按照如下的方法建立10个线程容量的线程池:
ExecutorService exec = Executors.newFixedThreadPool(10);
for (i = 0; i < taskAmount; i++)
exec.execute(new Task(...)); // 或者new Thread(...)
这样就把所有的任务提交给了线程池,由exec负责调度线程的执行,切换。
我们还应该加上下面一行代码:
exec.shutdown();
这样线程池在任务结束之后才会终止。我的问题是这样的:线程池的使用只是主线程的一个中间过程,而我必须等线程池中止之后才能继续后续的操作。所以我在上面的代码后面加上线程池中止判断语句,如果线程池仍在工作,主线程sleep。
while (!exec.isTerminated())
{
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printstacktrace();
}
}
可是我总觉得这不是最好的中止判断方式,至于哪里不好么……我也不清楚,只是觉得java应该提供更友好的接口,比如说可以让主线程等待线程池结束的方法,但是我没有找到。因此请教一下各路同道,有没有一种更好的方式实现呢? 先行谢过哦……
==============================
public class MasstestClient {
/
private static int threadPoolSize = 10; // Todo configurable
private static String testCasesFile = "./service/client/config/pc/testCases.dat";
private static DateFormat dformat = new SimpleDateFormat("MM/dd/yyyy");
public static void main(String args[]) {
try {
....
ExecutorService exec = Executors.newFixedThreadPool(threadPoolSize, testClient.new TestClientThreadFactory());
List<String[]> list = readByBufferReader(testCasesFile);
for (String[] str : list) {
exec.execute(testClient.new testClientThread(tmlModule, str));
}
exec.shutdown()
;//Initiates an orderly shutdown in which prevIoUsly submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
//This method does not wait for prevIoUsly submitted tasks to complete execution
while (!exec.isTerminated()) {
//Returns true if all tasks have completed following shut down.
// Note that isTerminated is never true unless either shutdown or shutdownNow was called first.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printstacktrace();
}
}
System.out.println("Execution complete");
} catch (Exception e) {
e.printstacktrace();
} finally {
System.exit(0);
}
}
private class testClientThread implements Runnable {
private String code;
private String id;
private String country;
private Date date;
private OPSTechModule module;
public testClientThread(OPSTechModule module, String[] str) throws Exception {
this.code = str[0];
this.id = str[1];
this.country = str[2];
this.date = dformat.parse(str[3]);
this.module = module;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + code + "|" + id + "|" + country + "|" + date);
ISGSecurityDocument result = module.tmlSearch(code, id, country, date);
if (result != null)
System.out.println(result.toString());
}
}
private class TestClientThreadFactory implements ThreadFactory {
int counter = 0;
@Override
public Thread newThread(Runnable runnable) {
Thread thr = new Thread(runnable);
++counter;
thr.setName("testClient-" + counter);
return thr;
}
}
public static List<String[]> readByBufferReader(String file) {
List<String[]> list = new ArrayList<String[]>();
try {
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String line;
while ((line = br.readLine()) != null && !line.trim().isEmpty()) {
// System.out.println(line);
String str[] = line.trim().split("\|");
list.add(str);
}
br.close();
} catch (Exception ex) {
ex.printstacktrace();
}
return list;
}
}
Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式
本文已同步至个人博客liaosi''s blog-Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式
ThreadPoolExecutor
ThreadPoolExecutor是Executor框架最重要的一个类,它即是真正意义上的线程池。该类的源码有两千多行,但大部分是注释说明,而且还有一些private/protected的方法,真正会用到的方法也并不太多。
先了解一下它的构造器。
ThreadPoolExecutor的构造器
ThreadPoolExecutor的构造器有4个重载的构造器,其中有两个ThreadFactory和RejectedExecutionHandler类型的参数是可选的。最完整的构造器如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造器的各个参数说明:
-
corePoolSize:核心线程数,核心线程会一直存活,即使没有任务需要处理。但如果设置了
allowCoreThreadTimeOut
`为 true 则核心线程也会超时退出。 - maximumPoolSize:最大线程数,线程池中可允许创建的最大线程数。
-
keepAliveTime:当线程池中的线程数大于核心线程数,那些多余的线程空闲时间达到keepAliveTime后就会退出,直到线程数量等于corePoolSize。如果设置了
allowCoreThreadTimeout
设置为true,则所有线程均会退出直到线程数量为0。 - unit:keepAliveTime参数的时间单位
- workQueue:在任务执行前用来保存任务的 阻塞队列。这个队列只会保存通过execute方法提交到线程池的Runnable任务。在ThreadPoolExecutor线程池的API文档中,一共推荐了三种等待队列,它们是:SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue。
- threadFactory:线程池创建新线程时使用的factory。默认使用defaultThreadFactory创建线程。
- handle:饱和策略。当线程池的线程数已达到最大,并且任务队列已满时来处理被拒绝任务的策略。默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException
除此之外,ThreadPoolExecutor还有两个个常用的参数设置:
- allowCoreThreadTimeout:是否允许核心线程空闲退出,默认值为false。
- queueCapacity:任务队列的容量。
ThreadPoolExecutor线程池的逻辑结构图:
线程池执行任务的行为方式
线程池按以下行为执行任务
1. 当线程数小于核心线程数时,创建线程。
2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
3. 当线程数大于等于核心线程数,且任务队列已满
1. 若线程数小于最大线程数,创建线程
2. 若线程数等于最大线程数,抛出异常,拒绝任务
Executors
Executors类是一个工厂类,提供了一系列静态工厂方法来创建不同的ExecutorService或 ScheduledExecutorService实例。
创建3种不同的ExecutorService(线程池)实例
1.newSingleThreadExecutor
创建一个单线程的线程池:启动一个线程负责按顺序执行任务,先提交的任务先执行。
其原理是:任务会被提交到一个队列里,启动的那个线程会从队里里取任务,然后执行,执行完,再从队列里取下一个任务,再执行。如果该线程执行一个任务失败,并导致线程结束,系统会创建一个新的线程去执行队列里后续的任务,不会因为前面的任务有异常导致后面无辜的任务无法执行。
源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2.newFixedThreadPool
创建一个可重用的固定线程数量的线程池。即corePoolSize=线程池中的线程数= maximumPoolSize。
- 如果没有任务执行,所有的线程都将等待。
- 如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程。
- 如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。
源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3.newCachedThreadPool
创建一个不限制线程数量的动态线程池。
- 因为有多个线程存在,任务不一定会按照顺序执行。
- 一个线程完成任务后,空闲时间达到60秒则会被结束。
- 在执行新的任务时,当线程池中有之前创建的空闲线程就使用这个线程,否则就新建一条线程。
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到newCachedThreadPool
使用的队列是SynchronousQueue
,和前两个是不一样的。线程池的线程数可达到Integer.MAX_VALUE,即2147483647。此外由于会有线程的创建和销毁,所以会有一定的系统开销。
4.newSingleThreadExecutor 与 newFixedThreadPool(1) 的区别
JavaDoc上说:
Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
举个例子:
((ThreadPoolExecutor)newFixedThreadPool(1)).setCorePoolSize(3);
即newFixedThreadPool(1)
可以后期修改线程数,不保证线程只有一个。而newSingleThreadExecutor
可以保证。
创建ScheduledExecutorService实例
关于ScheduledExecutorService的内容,在下一篇文章中介绍。
1.newSingleThreadScheduledExecutor
创建一个单线程的ScheduledExecutorService,在指定延时之后执行或者以固定的频率周期性的执行提交的任务。在线程池关闭之前如果有一个任务执行失败,并导致线程结束,系统会创建一个新的线程接着执行队列里的任务。
源码:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1)); //corePoolSize为1
}
还有一个重载的方法,多了一个ThreadFactory参数,ThreadFactory是用来确定新线程应该怎么创建的。
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
2.newScheduledThreadPool
创建一个固定线程数的ScheduledExecutorService对象,在指定延时之后执行或者以固定的频率周期性的执行提交的任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
同样的,也有一个重载的方法:
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
Java executors 创建线程池和使用 ThreadPoolExecutor
Java.util.concurrent 包下
executors 创建线程池
1. Executors.newFixedThreadPool()
创建一个定长的线程池,每提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不再变化
固定数量的核心线程
虽然线程数量是固定的,但是阻塞队列是无界队列。如果有很多请求积压,阻塞队列越来越长,容易导致 OOM
(无界队列其实是用了默认参数 Integer.MAX_VALUE, 一般来说,不应该允许那么多请求等待)
2. Executors.newCachedThreadPool()
创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要时,它可以灵活的回收空闲的线程,当需要增加时, 它可以灵活的添加新的线程,而不会对池的长度作任何限制
无核心线程
线程数量不固,和 1. 一样请求一多,容易 OOM
3. Executors.newScheduledThreadPool()
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于 Timer
和 1. 类似,支持定时和周期性任务执行
也是无界队列
4. Executors.newSingleThreadExecutor()
创建一个单线程化的 executor,它只创建唯一的 worker 线程来执行任务
必须前一项任务执行完毕才能执行后一项。
(核心线程和非核心线程的区别,没有区别,所谓核心线程是指线程不被销毁的数值范围,而哪些线程被销毁是随机的)
概括:高并发情况无界队列会有 OOM 的风险,所以推荐做法是使用 ThreadPoolExecutor(定时和周期性任务使用 ScheduledThreadPoolExecutor)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ThreadPoolExecutor
与给定的初始参数和默认线程工厂和拒绝执行处理程序。
corePoolSize 核心线程数量
maximumPoolSize 最大线程数量
keepAliveTime 线程保持时间,N 个时间单位
unit 时间单位(比如秒,分)
workQueue 阻塞队列
threadFactory 线程工厂
handler 线程池拒绝策略
今天关于JAVA线程池 之 Executors 和二 原理分析的分享就到这里,希望大家有所收获,若想了解更多关于Executor 框架(二)Executor 与 ExecutorService 两个基本接口、ExecutorService java线程池主线程等待子线程执行完成、Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式、Java executors 创建线程池和使用 ThreadPoolExecutor等相关知识,可以在本站进行查询。
本文标签: