本文共 5622 字,大约阅读时间需要 18 分钟。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl:
使用AtomicInteger
,目前够用,如果未来会有问题,可以改成AtomicLong,现在来说执行效率更高. c = ctl.get() < SHUTDOWN 表示在运行中的状态 COUNT_BITS是29, 各常量的二进制字节码如下:
CAPACITY: 00011111111111111111111111111111(前3位是0,后29位是1,共32位,约5亿) RUNNING: 11100000000000000000000000000000(负数,前3位是1,后29位是0,共32位) SHUTDOWN: 00000000000000000000000000000000(正数) STOP: 00100000000000000000000000000000(正数) TIDYING: 01000000000000000000000000000000(正数) TERMINATED: 01100000000000000000000000000000(正数) 以上可知:// 与运算后只有c的后29位有效private static int workerCountOf(int c) { return c & CAPACITY; }
中的入参c,即为ctl的值.该方法含义是:获取工作线程数(取后29位),初始值是0
// 取前三位,~表示非运算private static int runStateOf(int c) { return c & ~CAPACITY; }
先对CAPACITY进行非运算,变为11100000000000000000000000000000,再进行与运算,取前三位,表示线程的状态
private static int ctlOf(int rs, int wc) { return rs | wc; }
运行状态runState和工作线程数workerCount取或,即二者拼接起来.
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
这一步的前提是: 线程池在运行中,队列未满.
如果线程池已经关闭,要移除任务,执行拒绝策略. 如果工作线程已经死了(workerCountOf(recheck) == 0),需要重新创建线程.(这里要怎么理解呢?不是已经创建了核心线程吗,怎么会有工作线程为0的情况?)addWorker(null, false)这里false表示是非核心线程,加的任务是null,是因为任务在前面已经放到队列里面了:workQueue.offer(command)
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
addWorker前先做判断:
如果线程池为RUNNING状态或者为SHUTDOWN状态且此时任务队列仍有任务未执行完时,可以继续调用addWorker添加工作线程,但不能新建任务,即firstTask参数必须为null.否则这里将返回false,即新建工作线程失败。 简单地说,就是如果是shutdown状态,就不能再往队列里面加任务了.但是如果shutdown状态下任务队列还有任务,就还可以创建线程去执行任务.for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }
如果 工作线程数大于CAPACITY 或者大于corePoolSize或maximumPoolSize(视入参core而定),则返回false.
compareAndIncrementWorkerCount
将ctl的值+1, 成功就跳出循环往下执行.否则就重新获取ctl的值,判断是否已经更新,以确定继续进行内部循环(未更新)或者外部循环(已更新)
如果addWorker
返回成功,那么这时候线程已经开始执行相应的任务了;
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池,以减少线程切换带来的性能开销。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。对于runWorker方法,一开始对代码
try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 此处省略部分代码... } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
中的processWorkerExit部分不解,以为走到这里不就是销毁worker对象了吗?那么岂不是每个任务都要创建worker对象,即创建新线程?
其实不是这样的.里面的while (task != null || (task = getTask()) != null)
部分会一直执行下去,如果没有task,代码会阻塞在这里,直到从队列里面获取到task为止. try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }
而processWorkerExit方法的执行的条件是上面的while循环结束了.while循环结束的原因是:
线程池SHUTDOWN,或者工作线程数 > corePoolSize,且队列为空,没有task. 这就好理解了: 线程池关闭 --> 销毁全部worker对象, 线程数 > corePoolSize,且队列为空 --> 销毁大于核心线程数的worker对象! 细节代码区分: 这2处需要区分一下. t.start()的t是worker里面的Thread对象,该对象封装的Runnable对象正是worker.所以start方法启动线程后就是执行worker#run方法,即worker#runWorker方法. task.run()的task就是传入的用户写的任务.转载地址:http://iebws.baihongyu.com/