下方代码块是线程池的完整构造函数。
下方是ThreadPoolExecutor的execute方法
官方文档给出了三个步骤 文档的解释里,第一步中的addWorker是比较难理解的,所以需要继续查看addWorker方法的源码。 addWorker上半部分源码 第一个if中表示,如果当前运行的状态是SHUTDOWN,并且传入的任务为空,并且队列不为空的情况,则()中为true,前面加了个!则在这种情况下&&整个判断为false,则不会执行下面的return false。因此这部分主要是为了判断队列是否为空。 wc是当前工作的线程数量 下半部分代码 这里的步骤主要是 在这个步骤中,他再一次判断了是否需要添加新线程,如上述步骤2中所说,因为现有的线程在上次检查后死亡了,或者池在进入此方法后关闭。因此会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。 接下来看看Woker的逻辑 在Woker中会调用ThreadFactory来创建一个新线程。在上方的t.start()中就会调用这个run方法。 接下来继续看runWoker的逻辑,可以发现主要就是判断任务是否为空,不为空则运行。 最后再来看看任务判空中的getTask()方法 所以总结一下流程如下图所示
线程池的创建
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
线程池的执行流程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//步骤1 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//步骤2 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false))//步骤3 reject(command); }
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn’t, by returning false.
如果当前线程数小于核心线程数,则创建一个线程并尝试使用给定的指令作为其第一个任务启动新线程。对addWorker方法调用会自动检查 runState和workerCount,以此通过返回false来防止在不应该添加线程的情况下出现添加线程的错误。
double-check whether we should have added a thread (because existing
ones died since last checking) or that the pool shut down since
entry into this method. So we recheck state and if necessary roll
back the enqueuing if stopped, or start a new thread if there are
none.
如果任务可以成功进入队列,仍然需要再次检查是否应该添加一个线程(因为现有的线程在上次检查后死亡了),或者池在进入此方法后关闭。因此,我们会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。
fails, we know we are shut down or saturated and so reject the task.
如果不能将任务放入队列,则尝试添加一个新线程。如果失败了,则认为终止或者饱和,因此拒绝这个任务。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查队列是否为空. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } ......... }
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
这块代码是在创建非核心线程时,即core等于false。判断当前线程数是否大于等于maximumPoolSize,如果大于等于则返回false,即上边说到的步骤3中创建线程失败的情况。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//创建Worker对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//任务判空 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//任务运行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // allowCoreThreadTimeOut 默认是false,后者是判断当前线程数是否大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //当前线程数比核心线程数大,则执行poll()方法获取任务,超时时长和单位设置为以下参数 //否则调用take()方法阻塞队列 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算