Java ThreadPool 线程池

Java 中线程池源码分析。

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
public interface Executor {}
/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* Thread pools address two different problems:
* 1. 提升处理大量异步任务的性能
* 2. 资源,线程管理方法
*
* Executors#newCachedThreadPool (unbounded thread pool, with
* automatic thread reclamation);
* Executors#newFixedThreadPool (fixed size thread pool);
* Executors#newSingleThreadExecutor (single background thread), that
* preconfigure settings for the most common usage
* scenarios. Otherwise, use the following guide when manually
* configuring and tuning this class:
*
* A {@code ThreadPoolExecutor} will automatically adjust the
* pool size according to the bounds set by
* corePoolSize (see {@link #getCorePoolSize}) and
* maximumPoolSize (see {@link #getMaximumPoolSize}).
*
* 1. 当提交一个任务时,如果当前处于运行状态的线程数量小于corePoolSize,就会新建一个线程来
* 处理任务,即使有其它的工作线程已经处于idle状态;
* 2. 如果当前处于运行状态的线程数量大于corePoolSize,并且无法将任务添加到queue中,
* 并且线程数量小于maximumPoolSize,也会新创建一个线程;
*
* 所以,如果设置corePoolSize和maximumPoolSize相同,就创建了一个固定大小的线程池;
* 如果设置maximumPoolSize为一个本质上无界的数比如Integer.MAX_VALUE, you allow the
* pool to accommodate an arbitrary number of concurrent tasks
* (允许池容纳任意数量的并发任务。)
*
* By default, 只有任务到来的时候,core thread才会被创建并开始,
* 但是可以通过重写{#prestartCoreThread} 或者 {#prestartAllCoreThreads}.
* You probably want to prestart threads if
* you construct the pool with a non-empty queue.
*
* New threads are created using a {@link ThreadFactory}.
* Executors#defaultThreadFactory is used for default.
* it creates threads to all be in the same {@link
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
* non-daemon status. By supplying a different ThreadFactory, you can
* alter the thread's name, thread group, priority, daemon status, etc
*
* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime .
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* allowCoreThreadTimeOut(boolean)也可以改变这个策略, 只要keepAliveTime不为0.
*
* Queuing
* 1. 如果小于corePoolSize数量的线程在运行,就将创建新线程,而不会入到任务列。
* 2. 如果大于corePoolSize数量的线程在运行,就将添加到队列中,而不是创建新线程。
* 3. 如果任务不能被添加到队列中,并且线程数量小于maximumPoolSize,则会创建一个新的线程。
*
* 有三种队列管理策略:
*
* 1. Direct handoffs(直接切换). A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. 如果没有可以立即使用的线程,尝试将任务入列将失败
* 此时将创建新的线程来执行任务,这一策略避免了当处理多个可能有相互依赖关系的任务时造成死机
* Direct handoffs 通常使用无界的maximumPoolSizes来避免task被rejection.
*
* 2. Unbounded queues(无界队列). 使用不指定容量的无界队列,当所有的coreThread
* 都在运行状态时,新添加的任务将添加到队列中进行等待执行的状态,所以,不会有超过
* corePoolSize数量的线程被创建。(And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate(适当) when
* each task is completely independent(独立) of others;
* While this style of queuing can be useful in smoothing out
* transient(短暂的) bursts(爆炸) of requests.
*
* 3. Bounded queues. A bounded queue {@link ArrayBlockingQueue} helps
* prevent resource exhaustion(枯竭) when used finite(有限的) maximumPoolSizes,
* but can be more difficult to tune and control.
* Queue sizes and maximumPoolsize 可以相互转换配套使用:
* 1. large queues and small pools minimizes CPU usage, OS resources, and
* context-switching overhead, but may lead to low throughput(吞吐量).
* If tasks frequently block, a system may be able to schedule
* time for more threads than you otherwise allow.
* 2. small queues with larger pool sizes, which keeps CPUs busier but
* may 遇到不可接受的调度开销,这也会降低吞吐量,也会降低吞吐量 *
* /
/**
* @param corePoolSize 一直保存在池中的线程数量,即使线程的运行状态已经是idle,
* 除非allowCoreThreadTimeOut为true
* @param maximumPoolSize 最大允许的线程数量
* @param keepAliveTime 当线程数量大于coreSize时,当线程进入idle状态后,将等待一定的时间
* 如果有新任务进入,则执行任务,否则,terminate。
* @param unit the time unit
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 执行任务,新任务可能被添加到已经线程中,也可能新建线程来执行。
* 如果任务不能被提交执行,可能是executor已经shutdown或者queue的容量已满,
* 则task将被reject{@code RejectedExecutionHandler}.
*
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 1. 如果处于运行状态的线程数量小于corePoolSize,则尝试新建一个线程,addWorker
* 自动检查检查运行状态和workerCount,避免在不应该添加线程的时候添加线程。
* 2. 如果未启动新线程执行任务,则尝试将任务加入队列,如果成功,仍然需要再次执行检查
* >1.确保workerCount不为0(因为上次检查之后线程可能死掉了)。
* >2.确保executor还是RUNNING状态
* 3. 如果不能添加到队列中,则再次尝试启动线程来执行任务,如果失败了,则可能是已经
* shut down或者执行器已经饱和了,此时reject task。
*/
int c = ctl.get();
// step1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// step2
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// step3
else if (!addWorker(command, false))
reject(command);
}

addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 根据当前线程池的的state和corePoolSize及maximumPoolSize,决定是否添加一个任务,
* 因此,workerCount也被相应的调整,同时,一个新的worker被创建并且开始执行firstTask。
* 如果pool已经停止了或者调用了shutdown(),则该方法返回false;
* 如果factory创建线程失败(比如异常发生,OutOfMemoryError int Thread.start()),
* 也会返回false,同时,线程池回滚.
*
* @param firstTask 被执行的任务.
* 如果工作线程数量小于corePoolSize(对应上面的step1),则创建一个新的Worker。
* 如果queue已经满了(step2),此时也会创建一个Worker,如果线程数量小于maximumPoolSize。
* 初始的idle线程通常通过prestartCoreThread来创建。
*
* @param core 是否是coreThread
* 如果core为true,则使用corePoolSize来判断线程池的边界;
* 如果core为false,则使用maximumPoolSize来判断线程池边界;
*
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// rs>=SHUTDOWN,并且(rs!=SHUTDOWN 或 firstTask不为null 或 workQueue为空)
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&
firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果wc已经大于线程池的容量,则返回false。
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS操作增加线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker,简单说就是检查pool的状态,如果是running,判断当前的wc是否大于corePoolSize
或者大于maxmumPoolSize,如果不是,则尝试增加ctl的数值,如果成功,则创建一个Worker,
同时也会创建一个Thread来执行任务,将创建的worker添加到workers集合中。

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker继承自AbstractQueuedSynchronizer,并且实现了RunnableWorker把自己作为参数传入Thread,当线程执行的时候,就会执行到runWorker方法;

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 循环从queue中获取task并执行任务
*
* 1. 一开始运行的时候,有firstTask,所以不需要getTask去获取任务,当执行完成firstTask后,
* 则需要使用getTask去获取下一个待执行的任务,如果getTask返回null,则worker将根据pool
* state或者其它配置参数而退出。
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. 每个任务被执行前都被调用beforeExecute, 可以在该方法中抛出异常,终止任务。
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
// Worker执行完成
}
}

getTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 获取任务(阻塞) returns null if this worker must exit because of any of:
* 1. There are more than maximumPoolSize workers
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允许coreThread超时,或者wc>corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. wc大于maximumPoolSize或者允许超时的情况下超时
// 2. wc大于1,或者workQueue为空
// 如果两个条件满足,则返回null,没有可执行的任务;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// workQueue.poll获取任务,如果超过了keepAliveTime,则返回null;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 处理一个已经完成的Worker,只能从Worker线程调用。该方法将线程从workset中移除,
* 并且可能终止线程池或者替换worker,如果task产生exception或者少于corePoolSize
* 的工作线程在运行,或者queue非空但是却没有workers。
* And possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
/* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). */
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) { // 非异常完成
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 最小的运行worker数量为0,但workQueue不为空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 工作线程数量>=min,不再添加Worker;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

tryTeminate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible(合适的) to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate.
* 当任何可能导致termination的事件触发时,该方法必须被调用,
* 1. 减少workerCount
* 2. 从queue中移除task当shutdown的时候
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // 打断一个worker
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果tryLock()成功,则表示该worker没有在执行任务;
// 因为执行任务的时候会获取到Lock再执行;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 启动shutdown,(在之前提交的任务被执行完成之后),此时将不再接收新任务;
* 该方法不会等到所有任务执行完成后再向下执行,也即不会阻塞;
* Use {@link #awaitTermination awaitTermination} to do that.
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
// 立即停止
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 设置状态为STOP,在getTask方法中,如果遇到STOP,则直接返回null
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

invokeAll & invokeAny

  • invokeAll 等待全部任务执行完成
  • invokeAny 等待其中任何一个任务执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void invokeSample() throws Exception {
mExecutor = Executors.newFixedThreadPool(4);
List<StringCallable> callableList = new ArrayList<>();
Random rd = new Random();
callableList.add(new StringCallable("A1", Math.abs(rd.nextLong() % 1000)));
callableList.add(new StringCallable("B1", Math.abs(rd.nextLong() % 1000)));
callableList.add(new StringCallable("C1", Math.abs(rd.nextLong() % 1000)));
// String result = mExecutor.invokeAny(callableList);
// System.out.println("result = " + result);
List<Future<String>> resultList = mExecutor.invokeAll(callableList);
for (Future<String> f : resultList) {
String s = f.get();
System.out.println("result = " + s);
}
mExecutor.shutdown();
}

ExecutorCompletionService

在线程池中的任务完成后,逐个去获取执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void completionService() throws Exception {
mExecutor = Executors.newFixedThreadPool(4);
ExecutorCompletionService<String> ecs = new ExecutorCompletionService<>(mExecutor);
ecs.submit(new StringCallable("A1", 5000));
ecs.submit(new StringCallable("B1", 2500));
ecs.submit(new StringCallable("C1", 1000));
// mExecutor.awaitTermination(6, TimeUnit.SECONDS);
// 根据任务完成的顺序,获取到Future。
Future<String> future = null;
while ((future = ecs.take()) != null) {
String str = future.get();
System.out.println("result = " + str);
}
mExecutor.shutdown();
}

固定线程数量

可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。池大小固定,如果当前需要执行的任务超过了池大小,那么多于的任务等待状态,直到有空闲下来的线程执行任务,而当执行的任务小于池大小,空闲的线程也不会去销毁。

1
2
3
4
5
6
7
8
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 创建可以容纳3个线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

动态分配线程

根据需要创建新线程的线程池,在有已经构造的线程可用时将重用它们。CachedThreadPool会创建一个缓存区,将初始化的线程缓存起来,如果线程有可用的,就使用之前创建好的线程,如果没有可用的,就新创建线程,终止并且从缓存中移除已有60秒未被使用的线程。

1
2
3
4
5
6
7
8
ExecutorService threadPool = Executors.newCachedThreadPool();
// 线程池的大小会根据执行的任务数动态分配
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

单个线程

使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 单个线程的线程池,如果当前线程在执行任务时突然中断,则会创建一个新的线程替代它继续执行任务
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
static class DelegatedExecutorService extends AbstractExecutorService {}

SingleThreadExecutor,使用FinalizableDelegatedExecutorService包裹原因是啥?

定时/延迟 线程

可安排在给定延迟后运行命令或者定期地执行的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
// 效果类似于Timer定时器
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,实现了ScheduledExecutorService接口,用于分发任务;