线程池

线程池

1、为什么使用线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行的任务的程序都可以使用线程池。

线程池的使a用能够带来三个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,会消耗资源,降低系统的稳定性。

2、线程池的实现原理

image-20210528224954205

当线程池提交一个任务之后,线程池是如何处理这个任务的?

从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作

    线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

  2. 线程池判断阻塞队列是否已经满。如果阻塞队列没有满,则将新提交的任务存储在这

    个阻塞队列里。如果阻塞队列满了,则进入下个流程。

  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程

    来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

对于:ThreadPoolExecutor线程池的原理图如下:

image-20210528230026717

ThreadPoolExecutor执行execute方法分下面4种情况。

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)。

  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用

    RejectedExecutionHandler.rejectedExecution()方法

4、自定义线程池

1、拒绝策略

1
2
3
4
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}

2、自定义阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//阻塞队列
class BlockingQueue<T> {

//1. 任务队列 双向队列 放置未执行的任务
private Deque<T> queue = new ArrayDeque<>();

//2. 锁 多个线程都要想得到任务,但只能有一个线程得到
private ReentrantLock lock = new ReentrantLock();

//3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();


public BlockingQueue(int capacity) {
this.capacity = capacity;
}

//5. 容量
private int capacity;


//阻塞获取
public T take(){
lock.lock();
try{
//如果一直没有任务,消费者阻塞
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果有元素
T removeFirst = queue.removeFirst();
//唤醒
fullWaitSet.signal();
return removeFirst;
}finally {
lock.unlock();
}
}

//阻塞添加
public void put(T element){
lock.lock();

try{
//此时阻塞队列中已满,生产者阻塞
while(queue.size() == capacity){
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//如果没有满 放在尾部
queue.addLast(element);
//唤醒
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}

//获取大小
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}


//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();

try{
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);

while(queue.isEmpty()){
try {
if(nanos<=0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}

}

//带超时的阻塞添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);

while(queue.size() == capacity){
try{
if (nanos <= 0){
return false;
}

nanos = fullWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}

queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}


public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();

try{
//判断阻塞队列是否满了
if(queue.size() == capacity){
rejectPolicy.reject(this,task);
}else{
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}

}
}

3、自定义简单的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//线程池
class ThreadPool{
//阻塞队列
private BlockingQueue<Runnable> taskQueue;

//线程集合 正在工作的线程
private HashSet<Worker> workers = new HashSet<>();

//核心线程数
private int coreSize;

//设置超时时间,超过时间,线程关闭
private long timeout;

//时间单位
private TimeUnit timeUnit;

private RejectPolicy<Runnable> rejectPolicy;


class Worker extends Thread{
private Runnable task;

public Worker(Runnable task){
this.task = task;
}

@Override
public void run() {
//执行任务
//1 当task不为空 执行任务
//2 当task执行完毕 在接着阻塞队列任务并执行
while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}

synchronized (workers){
workers.remove(this);
}

}
}


//构造函数
public ThreadPool(int coreSize,long timeout,TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}


//执行任务
public void execute(Runnable task){
//当任务数没有超过coreSize时,直接交给worker对象执行
//如果任务数超过了coreSize时,加入阻塞队列
synchronized (workers){
if(workers.size() < coreSize){
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else{
taskQueue.tryPut(rejectPolicy,task);
}
}
}



}

3、ThreadPoolExecutor

image-20210529091559420

1、线程池状态

ThreadPoolExecutor 使用int的高三位来表示线程池的状态,低29位表示线程数量。

状态名 高3位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接受新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为0即将进入终结
TERMINATED 011 - - 终结状态

将这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS操作进行赋值。

1
2
3
4
5
6
7
8
//c 为旧值 ,ctlof返回结果为新值
ctl compareAndSet(c,ctlof(targetState,workerCountof(c)));

//rs 为高三位代表线程池状态,wc为低29位代表线程个数。ctl是合并它们
private static int ctlof(int rs,int wc){
return rs | wc;
}

2、构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)

  • maximumPoolSize 最大线程数目

  • keepAliveTime 生存时间 - 针对救急线程

  • unit 时间单位 - 针对救急线程

  • workQueue 阻塞队列

  • threadFactory 线程工厂 - 可以为线程创建时起个好名字

  • handler 拒绝策略

线程池中的最大线程数 = 核心线程数 + 救急线程数

3、使用

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。

  2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。

  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。

  4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。

  5. 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTimeunit 来控制。

4、拒绝策略

拒绝策略 JDK提供了 4 种实现,其它著名框架也提供了实现

image-20210529095907639

  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略

  • CallerRunsPolicy 让调用者运行任务

  • DiscardPolicy 放弃本次任务

  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题

  • Netty 的实现,是创建一个新线程来执行任务

  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略

  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

4、几种常见的线程池

1、newFixedThreadPool(固定数目线程的线程池)

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

特点:

  • 核心线程数 等于 最大线程数 ===> 没有救急线程被创建,因此也无需超时时间。

  • 阻塞队列是无界的,可以放问题数量的任务

工作机制:

  1. 提交任务
  2. 如果线程数少于核心线程,创建核心线程执行任务
  3. 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  4. 如果线程执行完任务,去阻塞队列取任务,继续执行。

适用于任务量已知,相对耗时的任务

2、newCachedThreadPool(可缓存线程的线程池)

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

特点:

  • 核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s。

  • 全部都是救急线程(60s 后可以回收)。救急线程可以无限创建

  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的

工作机制:

  1. 提交任务
  2. 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
  3. 判断是否有空闲线程,如果有,就去取出任务执行。
  4. 如果没有空闲线程,就新建一个线程执行。
  5. 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。

极端情况下会创建过多的线程,耗尽 CPU 和内存资源。

由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

适合任务数比较密集,但每个任务执行时间较短的情况

3、newSingleThreadExecutor(单线程的线程池)

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

特点:

  • 线程数固定为 1。
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

工作流程:

  1. 提交任务
  2. 线程池是否有一条线程在,如果没有,新建线程执行任务
  3. 如果有,将任务加到阻塞队列
  4. 当前的唯一线程,从队列取任务,执行完一个,再继续取,一条线程夜以继日地干活。

newSingleThreadExecutor和我们自己创建一个线程的区别:

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。

newSingleThreadExecutorExecutors.newFixedThreadPool(1)的区别

  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

适合希望多个任务排队执行。

4、newScheduledThreadPool(定时及周期执行的线程池)

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

特点:

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

工作机制:

  1. 提交任务

  2. 线程池中的线程从 DelayQueue 中取任务

  3. 线程从 DelayQueue 中获取 time 大于等于当前时间的task

  4. 执行完后修改这个 task 的 time 为下次被执行的时间

  5. 这个 task 放回DelayQueue队列中

适用于周期性执行任务的场景,需要限制线程数量的场景

5、线程池的工作队列

1、ArrayBlockingQueue

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

2、LinkedBlockingQueue

LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务。

容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列

3、DelayQueue

DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

4、PriorityBlockingQueue

PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列

5、SynchronousQueue

SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

6、提交任务

1、execute

1
void execute(Runnable command)
1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务");
}
});

2、submit

1
2
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
1
2
3
4
5
6
7
8
9
10
ExecutorService pool = Executors.newFixedThreadPool(2);

Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("执行任务");
Thread.sleep(10000);
return "ok";
}
});

3、invokeAll

1
2
3
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ExecutorService pool = Executors.newFixedThreadPool(2);

List<Future<String>> invokeAll = pool.invokeAll(Arrays.asList(
() -> {
Thread.sleep(1000);
return "1";
},
() -> {
Thread.sleep(2000);
return "2";
},
() -> {
Thread.sleep(3000);
return "3";
}
));

invokeAll.forEach(f -> {
try {
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});

4、invokeAny

1
2
3
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService pool = Executors.newFixedThreadPool(2);

String any = pool.invokeAny(Arrays.asList(
() -> {
Thread.sleep(10000);
return "1";
},
() -> {
Thread.sleep(5000);
return "2";
},
() -> {
return "3";
}
));

System.out.println(any); //2

7、关闭线程池

1、shutdown

1
2
3
4
5
6
7
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}

2、shutdownNow

1
2
3
4
5
6
7
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}

// 尝试终结
tryTerminate();
return tasks;
}

3、其它方法

1
2
3
4
5
6
7
8
9
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,
// 因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

线程池
https://johnjoyjzw.github.io/2020/10/05/线程池/
Author
John Joy
Posted on
October 5, 2020
Licensed under