线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。—-百度百科

我们首先来看下四种常用的线程池吧,下面是四种常用线程池的介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// 我们首先来看下四种常用的线程池吧
import java.util.concurrent.*;

public class ExecutorThreadPool {
//newCachedThreadPool
/*
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
工作线程的创建数量几乎没有限制,这样可灵活的往线程池中添加线程
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则改工作线程将自动终止,终止后,如果又提交了新的任务,
则线程池重新创建一个工作线程
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量的线程同时运行,很有可能会造成系统瘫痪
*/
private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//newFixedThreadPool
/*
创建一个指定工作线程数量的线程池,每当提交一个任务就创建一个线程,如果工作线程数量达到线程池初始的最大数量,则将提交的任务存入到池队列中,
FixedThread是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线城时所消耗的开销的有点,但是,在线程池空闲的时候,它不会释放
工作线程,还会占用一定的系统资源
*/
private final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

//newSingleThreadExecutor
/*
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会有唯一的线程来执行任务,保证所有的任务按照指定顺序来执行,如果这个
线程异常结束,将会有另一个来取代它(待商榷),保证顺序执行,单工作线程的最大特点是可保证顺序的执行各个任务,并且在任意给定事件不会有多个线程是活动的
*/
private final ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

//newScheduleThreadPool
/*
创建一个定长的线程池,而且支持定时的以及周期行的任务执行
*/
private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(5);


public static void main(String[] args) {
ExecutorThreadPool executorThreadPool = new ExecutorThreadPool();
executorThreadPool.testCachedThreadPool();
executorThreadPool.testFixedThreadPool();
executorThreadPool.testNewSingleThreadPool();
executorThreadPool.testScheduleThreadPool();
}


public void testCachedThreadPool() {
cachedThreadPool.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("CachedThreadPool" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

public void testFixedThreadPool() {
for(int j = 0; j < 4; j++) {
fixedThreadPool.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("第" + i + "个任务" + "FixedThreadPool" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

public void testNewSingleThreadPool(){
singleThreadPool.execute(() -> {
for(int i = 0; i < 10; i++) {
System.out.println("SingleThreadPool" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

public void testScheduleThreadPool(){
scheduleThreadPool.scheduleAtFixedRate(()-> {
for (int i = 0; i < 10; i++) {
System.out.println("ScheduleThreadPool" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, 2, TimeUnit.SECONDS);
}
}

但这里我们不推荐使用Executors去创建线程池,是不符合编程规范的,推荐使用ThreadPoolExecutor去创建

线程池的优点

管理一组工作线程,通过线程池复用线程有以下几点优点:

  • 减少资源创建:减少内存开销,创建线程占用内存(特别是并发比较高的生产环境)
  • 降低系统开销:创建线程需要时间,会延迟处理的请求
  • 提高稳定性:避免无限创建线程引起的OutOfMemoryError

Executors创建方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM.
2)newCachedThreadPool和newScheduledThreadPool:

主要问题是线程数最多数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM.

我们简单看看这个语句

1
private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

关注一下源码实现

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

底层其实还是Executors对ThreadPoolExecutor进行的封装,推荐使用其实还是让使用者们夺取关注一下其底层的实现,根据业务场景的需要做适合的调整

这里说一下每个参数的含义

构造函数参数说明:

  • corePoolSize => 线程池核心线程数量
  • maximumPoolSize => 线程池最大数量
  • keepAliveTime => 空闲线程存活时间
  • unit => 时间单位
  • workQueue=> 线程池所使用的缓冲队列
  • threadFactory => 线程池创建线程使用的工厂
  • handler => 线程池对拒绝任务的处理策略

img

前几个参数比较容易理解,这里说一下任务队列与线程池拒绝策略

首先任务队列我们又可以叫做阻塞队列:

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

LinkedBlockingDeque

LinkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

LinkedBlockingQueue

LinkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。

PriorityBlockingQueue

PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

PS:队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。

线程池的拒绝策略

所有拒绝策略都实现了接口 RejectedExecutionHandler

1
2
3
4
5
6
7
8
9
public interface RejectedExecutionHandler {

/**
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
AbortPolicy

直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐

1
2
3
4
5
6
7
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy

在调用者线程中(也就是说谁把 r 这个任务甩来的),运行当前被丢弃的任务。

只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。

如果线程池已经被关闭,则直接丢弃该任务。

1
2
3
4
5
6
7
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardOledestPolicy

丢弃队列中最老的,然后再次尝试提交新任务。

1
2
3
4
5
6
7
8
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy

默默丢弃无法加载的任务。

1
2
3
4
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
自定义线程池拒绝策略

通过实现 RejectedExecutionHandler 接口扩展

示例:dubbo,直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件(JVM内存文件快照的输出)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
}