GVKun编程网logo

JAVA线程池 之 Executors (二) 原理分析(java 线程池execute)

13

最近很多小伙伴都在问JAVA线程池之Executors和二原理分析这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Executor框架(二)Executor与ExecutorSer

最近很多小伙伴都在问JAVA线程池 之 Executors 二 原理分析这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Executor 框架(二)Executor 与 ExecutorService 两个基本接口、ExecutorService java线程池主线程等待子线程执行完成、Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式、Java executors 创建线程池和使用 ThreadPoolExecutor等相关知识,下面开始了哦!

本文目录一览:

JAVA线程池 之 Executors (二) 原理分析(java 线程池execute)

JAVA线程池 之 Executors (二) 原理分析(java 线程池execute)

一、线程池状态

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  RUNNING :  该状态的线程池会接收新的任务,并处理阻塞队列中的任务。

  SHUTDOWN : 该状态的线程池不会接收新的任务,但会处理阻塞队列中的任务。

  STOP : 该状态的线程池不会接收新的任务,也不会处理阻塞队列中的任务,而且会中断正在执行的任务。

 

二、任务提交 方式

   1、execute

    提交的任务必须实现Runnable接口,接口不带返回值

public void execute(Runnable command) {

 

   2、submit

      父类AbstractExecutorService提供有submit接口,可获取线程执行返回值。    

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

 

三、任务执行 -- execute

  execute  方法

  

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

 

  大致流程为:

    1、通过workerCountOf方法得到线程池的当前线程数,如果当前线程数小于corePoolSize,则执行addWorker方法创建一个新的核心线程执行任务。

    2、如果当前线程数大于等于corePoolSize时,检查线程池的运行状态,如果线程池运行状态为RUNNING,则尝试将任务加入阻塞队列。

    3、再次检查线程池的运行状态,如果运行状态不为RUNNING,则从阻塞队列中删除任务并执行reject方法调用处理机制。

    4、在2的基础上,如果加入阻塞队列失败,则会执行addWorker方法创建一个新的非核心线程执行任务。

    5、在3的基础上,如果addWorker执行失败,则会调用reject调用处理机制。

 

  addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    大致流程为:

      1、自旋检测线程池状态,如果状态大于SHUTDOWN,或者 firstTask为空 或队列为空 时,返回任务加入队列失败。

      2、获取线程池当前线程数,通过core判断是否是创建核心线程,如果为true,并且当前线程数wc小于corePoolSize时,跳出循环创建新的线程。如果core为false,

        则判断当前线程数wc是否小于maximumPoolSize,小于跳出循环。

      3、线程池的工作线程时候通过Worker实现的,通过ReentrantLock加锁,再次通过线程池状态监测之后,将worker加入到HashSet<Worker> workers 里面

      4、如果加入成功,则启动Worker中的线程。

 

 

  Worker类

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

 

    Worker类继承了AbstractQueuedSynchronizer(AQS)类,可以方便的实现工作线程的中止操作。

    并且本身实现了Runnable接口,可单独作为任务在工作线程中执行。

 

  runWorker 方法

  

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  runWorker流程:

  1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;

  2、获取第一个任务firstTask,并执行task的run方法,在执行run方法前,会对Worker加锁,任务执行完释放锁。

  3、在任务执行前后,可根据业务自定义实现beforeExecute(wt, task); 和 afterExecute(task, thrown);。

  4、任务执行完之后,调用getTask从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用CPU资源。

 

  getTask方法

  

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

  getTask流程:

   

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

  1、如果设定了超时机制,则通过 workQueue.poll()方法来获取阻塞队列中的任务,如果队列中没有任务,则会在keepAliveTime时间后返回null。

  2、如果未设置超时机制,并且当前线程数小于核心线程时,同时未设置允许核心线程超时的情况下,通过workQueue.take(); 方法来获取阻塞队列中的任务,如果没有任务,

    则会一直等待并挂起,直到有新任务提交时,则会环信等待的队列并返回新的任务。

  3、阻塞队列使用生产者与消费者模式,使用等待与唤醒使线程池线程挂起与唤起。

 


四、任务执行 -- submit

 

    submit重载了多种实现方式

    1、Callable 

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    2、Runnable

  

public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

 

  在实际业务中,Future和Callable是成双出现的,Callable负责产生结果,Future负责获取结果。

  1、Callable类似于Runnable,只是Callable附带返回值。

  2、Callable除了正常返回之外,如果线程出现异常,该异常也会返回,即Future的get方法可以获取到异常结果。

  3、Future的get()方法会导致主线程阻塞,直到Callable执行完成。

 

  FutureTask

    

  futureTask内部状态

* Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    FutureTask 实现了Runnable接口,提交的任务可以交由工作线程处理,执行run方法。

  get方法

  

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

  调用get方法时,如果task的状态处于执行中或初始化,调用awaitDone方法对线程进行阻塞。

  

  awaitDone方法

  

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

  通过对Task的状态检测,如果Callable未执行完成,使用  LockSupport.park(this); 对当前线程进行阻塞。等待唤起,并将主线程封装成WaitNode 并存放在 waiters 链表中。

 

  run方法

  

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    run方法流程:

      通过对task的state判断,如果task为初始New状态,则执行call方法,获取call方法返回结果,并调用set方法

      如果执行失败,则调用setException方法。

    

setException方法

   设置状态  EXCEPTIONAL

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

 

  

   set方法  

    设置状态  NORMAL

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  finishCompletion();方法

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

  如果finishCompletion 检测到 通过get方法被阻塞的线程集 waiters 不为空时,获取的每一个节点,并使用   LockSupport.unpark(t); 对其唤醒。

  最终使用report返回结果。

Executor 框架(二)Executor 与 ExecutorService 两个基本接口

Executor 框架(二)Executor 与 ExecutorService 两个基本接口

一、Executor 接口简介

Executor 接口是 Executor 框架的一个最基本的接口,Executor 框架的大部分类都直接或间接地实现了此接口。

只有一个方法

void execute(Runnable command): 在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。

Executor 的几种实现原理介绍:

1、 <font color="blue">Executor 接口并没有严格地要求执行是异步的。</font > 在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

2、 更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

3、 许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。

 class SerialExecutor implements Executor {
     private final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
     private  final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) {
         this.executor = executor;
     }

     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }

 }

二、ExecutorService 接口简介

  ExecutorService 是一个接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 ExecutorService 的实现:

  • 三个实现类:AbstractExecutorService(默认实现类) , ScheduledThreadPoolExecutor, ThreadPoolExecutor
  • Executors 提供了此接口的几种常用实现的工厂方法。

方法摘要

1. 从 Executor 接口中继承了不跟踪异步线程,没有返回的 execute 方法:

void execute(Runnable command):
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。

2. 扩展的跟踪异步线程、返回 Future 接口的实现类的方法:

Future<?> submit(Runnable task): 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。

**<T> Future<T> submit(Runnable task,T result): ** 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。

<T> Future<T> submit(Callable<T> task): 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。<font color="blue"> 该 Future 的 get 方法在成功完成时将会返回该任务的结果。如果想立即 < font color="red"> 阻塞任务的等待 </font>,则可以使用 result = exec.submit (aCallable).get (); 形式的构造 </font>

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException: 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone () 为 true。注意,可以正常地或通过抛出异常来终止已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException: 超时等待,同上。

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException: 与 invokeAll 的区别是,<font color="red"> 任务列表里只要有一个任务完成了,就立即返回。而且一旦正常或异常返回后,则取消尚未完成的任务。</font>

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException: 超时等待,同上。

boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException: 一直等待,直到所有任务完成。请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行,或者超时时间的到来 <font color="red"> 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false </font>

3. 管理生命周期

void shutdown(): 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 List<Runnable> shutdownNow(): 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,<font color="blue"> 在 ThreadPoolExecutor 中,通过 Thread.interrupt () 来取消典型的实现,所以如果任务无法响应中断,则永远无法终止。</font> boolean isShutdown(): 如果此执行程序已关闭,则返回 true。 boolean isTerminated(): 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。

用法示例

下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool (int) 工厂方法:

class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
 
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }
 

<br/>

下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

参考文献

  • 《java 并发编程的艺术》

ExecutorService java线程池主线程等待子线程执行完成

ExecutorService java线程池主线程等待子线程执行完成

 

java1.5及1.6中通过ExecutorService提供了线程池的支持。我们可以按照如下的方法建立10个线程容量的线程池:

            ExecutorService exec = Executors.newFixedThreadPool(10);
            for (i = 0; i < taskAmount; i++)
                          exec.execute(new Task(...)); // 或者new Thread(...)


这样就把所有的任务提交给了线程池,由exec负责调度线程的执行,切换。
我们还应该加上下面一行代码:

            exec.shutdown();


这样线程池在任务结束之后才会终止。我的问题是这样的:线程池的使用只是主线程的一个中间过程,而我必须等线程池中止之后才能继续后续的操作。所以我在上面的代码后面加上线程池中止判断语句,如果线程池仍在工作,主线程sleep。

            while (!exec.isTerminated())
            {
                  try
                  {
                         Thread.sleep(1000);
                  } catch (InterruptedException e)
                  {
                         e.printstacktrace();
                  }
            }


可是我总觉得这不是最好的中止判断方式,至于哪里不好么……我也不清楚,只是觉得java应该提供更友好的接口,比如说可以让主线程等待线程池结束的方法,但是我没有找到。因此请教一下各路同道,有没有一种更好的方式实现呢? 先行谢过哦……

==============================
public class MasstestClient {

    /
    private static int threadPoolSize = 10; // Todo configurable
    private static String testCasesFile = "./service/client/config/pc/testCases.dat";
    private static DateFormat dformat = new SimpleDateFormat("MM/dd/yyyy");

    public static void main(String args[]) {
        try {
             ....

            ExecutorService exec = Executors.newFixedThreadPool(threadPoolSize, testClient.new TestClientThreadFactory());

            List<String[]> list = readByBufferReader(testCasesFile);

            for (String[] str : list) {
                exec.execute(testClient.new testClientThread(tmlModule, str));
            }

            exec.shutdown() ;//Initiates an orderly shutdown in which prevIoUsly submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
            //This method does not wait for prevIoUsly submitted tasks to complete execution


            while (!exec.isTerminated()) { //Returns true if all tasks have completed following shut down.
                         // Note that isTerminated is never true unless either shutdown or shutdownNow was called first.


                try {
                    Thread.sleep(1000);

                } catch (InterruptedException e) {
                    e.printstacktrace();
                }
            }

            System.out.println("Execution complete");
        } catch (Exception e) {
            e.printstacktrace();
        } finally {
            System.exit(0);
        }
    }

    private class testClientThread implements Runnable {
        private String code;
        private String id;
        private String country;
        private Date date;
        private OPSTechModule module;

        public testClientThread(OPSTechModule module, String[] str) throws Exception {
            this.code = str[0];
            this.id = str[1];
            this.country = str[2];
            this.date = dformat.parse(str[3]);
            this.module = module;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "  " + code + "|" + id + "|" + country + "|" + date);
            ISGSecurityDocument result = module.tmlSearch(code, id, country, date);

            if (result != null)
                System.out.println(result.toString());
        }

    }

    private class TestClientThreadFactory implements ThreadFactory {
        int counter = 0;

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thr = new Thread(runnable);
            ++counter;
            thr.setName("testClient-" + counter);
            return thr;
        }
    }

    public static List<String[]> readByBufferReader(String file) {
        List<String[]> list = new ArrayList<String[]>();
        try {
            BufferedReader br = new BufferedReader(new FileReader(new File(file)));
            String line;
            while ((line = br.readLine()) != null && !line.trim().isEmpty()) {
                // System.out.println(line);
                String str[] = line.trim().split("\|");
                list.add(str);
            }
            br.close();
        } catch (Exception ex) {
            ex.printstacktrace();
        }
        return list;
    }

}


Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式

Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式

本文已同步至个人博客liaosi''s blog-Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式

ThreadPoolExecutor

ThreadPoolExecutor是Executor框架最重要的一个类,它即是真正意义上的线程池。该类的源码有两千多行,但大部分是注释说明,而且还有一些private/protected的方法,真正会用到的方法也并不太多。

先了解一下它的构造器。

ThreadPoolExecutor的构造器

ThreadPoolExecutor的构造器有4个重载的构造器,其中有两个ThreadFactory和RejectedExecutionHandler类型的参数是可选的。最完整的构造器如下:

     /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */   
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造器的各个参数说明:

  • corePoolSize:核心线程数,核心线程会一直存活,即使没有任务需要处理。但如果设置了allowCoreThreadTimeOut `为 true 则核心线程也会超时退出。
  • maximumPoolSize:最大线程数,线程池中可允许创建的最大线程数。
  • keepAliveTime:当线程池中的线程数大于核心线程数,那些多余的线程空闲时间达到keepAliveTime后就会退出,直到线程数量等于corePoolSize。如果设置了allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
  • unit:keepAliveTime参数的时间单位
  • workQueue:在任务执行前用来保存任务的 阻塞队列。这个队列只会保存通过execute方法提交到线程池的Runnable任务。在ThreadPoolExecutor线程池的API文档中,一共推荐了三种等待队列,它们是:SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue。
  • threadFactory:线程池创建新线程时使用的factory。默认使用defaultThreadFactory创建线程。
  • handle:饱和策略。当线程池的线程数已达到最大,并且任务队列已满时来处理被拒绝任务的策略。默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

除此之外,ThreadPoolExecutor还有两个个常用的参数设置:

  • allowCoreThreadTimeout:是否允许核心线程空闲退出,默认值为false。
  • queueCapacity:任务队列的容量。

ThreadPoolExecutor线程池的逻辑结构图:
图片描述

线程池执行任务的行为方式

线程池按以下行为执行任务

1. 当线程数小于核心线程数时,创建线程。
2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务

图片描述

Executors

Executors类是一个工厂类,提供了一系列静态工厂方法来创建不同的ExecutorService或 ScheduledExecutorService实例。

创建3种不同的ExecutorService(线程池)实例

1.newSingleThreadExecutor

创建一个单线程的线程池:启动一个线程负责按顺序执行任务,先提交的任务先执行。

其原理是:任务会被提交到一个队列里,启动的那个线程会从队里里取任务,然后执行,执行完,再从队列里取下一个任务,再执行。如果该线程执行一个任务失败,并导致线程结束,系统会创建一个新的线程去执行队列里后续的任务,不会因为前面的任务有异常导致后面无辜的任务无法执行。
源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

2.newFixedThreadPool

创建一个可重用的固定线程数量的线程池。即corePoolSize=线程池中的线程数= maximumPoolSize。

  • 如果没有任务执行,所有的线程都将等待。
  • 如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程。
  • 如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。

源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.newCachedThreadPool

创建一个不限制线程数量的动态线程池。

  • 因为有多个线程存在,任务不一定会按照顺序执行。
  • 一个线程完成任务后,空闲时间达到60秒则会被结束。
  • 在执行新的任务时,当线程池中有之前创建的空闲线程就使用这个线程,否则就新建一条线程。

源码:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到newCachedThreadPool使用的队列是SynchronousQueue,和前两个是不一样的。线程池的线程数可达到Integer.MAX_VALUE,即2147483647。此外由于会有线程的创建和销毁,所以会有一定的系统开销。

4.newSingleThreadExecutor 与 newFixedThreadPool(1) 的区别

JavaDoc上说:

Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

举个例子:

((ThreadPoolExecutor)newFixedThreadPool(1)).setCorePoolSize(3);

newFixedThreadPool(1)可以后期修改线程数,不保证线程只有一个。而newSingleThreadExecutor可以保证。

创建ScheduledExecutorService实例

关于ScheduledExecutorService的内容,在下一篇文章中介绍。

1.newSingleThreadScheduledExecutor

创建一个单线程的ScheduledExecutorService,在指定延时之后执行或者以固定的频率周期性的执行提交的任务。在线程池关闭之前如果有一个任务执行失败,并导致线程结束,系统会创建一个新的线程接着执行队列里的任务。

源码:

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));   //corePoolSize为1
    }

还有一个重载的方法,多了一个ThreadFactory参数,ThreadFactory是用来确定新线程应该怎么创建的。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

2.newScheduledThreadPool

创建一个固定线程数的ScheduledExecutorService对象,在指定延时之后执行或者以固定的频率周期性的执行提交的任务。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

同样的,也有一个重载的方法:

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

Java executors 创建线程池和使用 ThreadPoolExecutor

Java executors 创建线程池和使用 ThreadPoolExecutor

Java.util.concurrent 包下

executors 创建线程池

1. Executors.newFixedThreadPool()

   创建一个定长的线程池,每提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不再变化

固定数量的核心线程

虽然线程数量是固定的,但是阻塞队列是无界队列。如果有很多请求积压,阻塞队列越来越长,容易导致 OOM

(无界队列其实是用了默认参数 Integer.MAX_VALUE, 一般来说,不应该允许那么多请求等待)

 

 

 

2. Executors.newCachedThreadPool()

   创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要时,它可以灵活的回收空闲的线程,当需要增加时,  它可以灵活的添加新的线程,而不会对池的长度作任何限制

无核心线程

线程数量不固,和 1. 一样请求一多,容易 OOM

 

3. Executors.newScheduledThreadPool()

   创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于 Timer

1. 类似,支持定时和周期性任务执行

也是无界队列

 

4. Executors.newSingleThreadExecutor()

   创建一个单线程化的 executor,它只创建唯一的 worker 线程来执行任务

必须前一项任务执行完毕才能执行后一项。

 

(核心线程和非核心线程的区别,没有区别,所谓核心线程是指线程不被销毁的数值范围,而哪些线程被销毁是随机的)

 

概括:高并发情况无界队列会有 OOM 的风险,所以推荐做法是使用 ThreadPoolExecutor(定时和周期性任务使用 ScheduledThreadPoolExecutor)

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

 
创建一个新的 ThreadPoolExecutor 与给定的初始参数和默认线程工厂和拒绝执行处理程序。  

corePoolSize 核心线程数量

maximumPoolSize 最大线程数量

keepAliveTime 线程保持时间,N 个时间单位

unit 时间单位(比如秒,分)

workQueue 阻塞队列

threadFactory 线程工厂

handler 线程池拒绝策略

 

今天关于JAVA线程池 之 Executors 二 原理分析的分享就到这里,希望大家有所收获,若想了解更多关于Executor 框架(二)Executor 与 ExecutorService 两个基本接口、ExecutorService java线程池主线程等待子线程执行完成、Executor框架(二)Executors、ThreadPoolExecutor以及线程池执行任务的行为方式、Java executors 创建线程池和使用 ThreadPoolExecutor等相关知识,可以在本站进行查询。

本文标签: