
线程池的7中创建方式
当执行很多较短时间的任务时,其线程的复用率比较高,会显著提升性能,而线程60s后会回收,意味没有任务进来,
一、创建线程池的方式可分为两大类
- 通过
ThreadPoolExecutor
手动创建线程池 - 通过
Executors
执行器自动创建线程池
以上为两类创建线程池的方式,具体实现有如下7种方法:
方法 | 解释说明 |
---|---|
ThreadPoolExecutor |
手动创建线程池,可自定义相关参数,最多可设置 7 个参数 |
Executors.newFixedThreadPool |
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待 |
Executors.newCachedThreadPool |
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程 |
Executors.newSingleThreadExecutor |
创建单个线程数的线程池,可保证先进先出的执行顺序 |
Executors.newScheduledThreadPool |
创建一个可以执行延迟任务的线程池 |
Executors.newSingleThreadScheduledExecutor |
创建一个单线程的可以执行延迟任务的线程池; |
Executors.newWorkStealingPool |
创建一个抢占式执行的线程池,任务执行顺序不确定 ;JDK 1.8 添加 |
二、线程池的创建及具体使用代码示例
1、ThreadPoolExecutor 【手动创建,阿里巴巴建议使用此方式】
maven
前置工作,pom
文件引入如下依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
引入上述依赖主要用来创建线程池工厂
new ThreadFactoryBuilder().setNameFormat("- 高高手动创建的线程池-%d").build()
使用;
若不引入上述依赖,也可替换其他依赖,或者直接使用默认的线程池工厂:
Executors.defaultThreadFactory()
代码示例:
@Slf4j
@SpringBootTest
class PublicWechatApplicationTests {
private ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() << 2, 1200L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000), new ThreadFactoryBuilder().setNameFormat("- 高高手动创建的线程池-%d").build(), new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadPoolExecutor() {
for (int i = 0; i < 10; i++) {
final int value = i;
executor.execute(() -> {
log.info("请执行具体业务逻辑:{}", value);
});
}
}
}
执行结果:
2023-12-18 18:06:47.588 INFO 16432 --- [ - 高高手动创建的线程池-7] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:7
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-3] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:3
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-0] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:0
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-2] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:2
2023-12-18 18:06:47.589 INFO 16432 --- [ - 高高手动创建的线程池-0] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:9
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-5] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:5
2023-12-18 18:06:47.589 INFO 16432 --- [ - 高高手动创建的线程池-7] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:8
2023-12-18 18:06:47.588 INFO 16432 --- [ - 高高手动创建的线程池-6] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:6
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-1] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:1
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-4] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:4
小结:
ThreadPoolExecutor 相比于其他方式 创建线程池的优势在于:它可以通过参数来控制最大任务数和拒绝策略,让线程池的执行更加透明和可控,可以规避一些未知的风险。
2、newFixedThreadPool 【常见】
Executors.newFixedThreadPool(int var0)
源码如下:
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
因传入的参数
var0
,即使创建的核心线程数也是线程总数,所以只能创建核心线程数。
因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
使用 Executors.newFixedThreadPool
创建5个固定大小的线程池,代码示例如下:
public void testFixedThreadPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newFixedThreadPool(5);
//创建一个线程
Thread thread = new Thread(() -> {
log.info("执行任务中,线程:{}", Thread.currentThread().getName());
});
//execute 执行没有返回结果
service.execute(thread);
//创建一个有返回结果的线程
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
// 此处执行具体的业务逻辑
return "线程返回结果....";
}
};
// submit 执行结束后可获取返回结果
Future<String> submit = service.submit(callable);
String result = submit.get();
log.info("Callable 线程 {} 执行任务 结果:{}", Thread.currentThread().getName(), result);
Thread.sleep(3000L);
service.execute(thread);
}
3、newCachedThreadPool 【常见】
Executors.newCachedThreadPool()
源码如下:
public static ExecutorService newCachedThreadPool() {
// 2147483647 = Integer.MAX_VALUE
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
运行流程如下:
1. 提交任务进线程池
2. 因 `corePoolSize` 为 0 ,故不创建核心线程,线程池最大为 2147483647 (即:Integer.MAX_VALUE)
3. 尝试将任务添加到SynchronousQueue队列
4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从
SynchronousQueue拉取任务并在当前线程执行
6. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞
总结:
当执行很多较短时间的任务时,其线程的复用率比较高,会显著提升性能,而线程60s后会回收,意味没有任务进来,CacheThreadPool
并不会占用很多资源
适用场景:
根据短时间的任务量来决定创建的线程数量,适合于短时间内有突发大量任务的处理场景。
代码示例如下:
public void testCachedThreadPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}
[区别比较] newFixedThreadPool和 newCachedThreadPool 比较
newFixedThreadPool
只创建核心线程,而newCachedThreadPool
只会创建非核心线程。- 在
getTask()
方法,如果队列没有任务可取,核心线程会一直阻塞在LinkedBlockingQueue.take()
,线程不会被回收,而newCachedThreadPool
会在60s
后回收。- 因核心线程不会被回收,一直阻塞,固在没有任务的情况下,
newFixedThreadPool
占用资源更多。- 两者几乎不会触发拒绝策略;
newFixedThreadPool
因阻塞队列很大(最大为Integer最大值),所以几乎不会触发拒绝策略;newCachedThreadPool
因为线程池很大(最大为Integer最大值),线程数几乎不会大于最大线程数,所以也几乎不会触发拒绝策略。
4、newSingleThreadExecutor 【常用】
Executors.newSingleThreadExecutor()
源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
通过源码分析可知:仅仅只有一个核心线程(
corePoolSize
==maximumPoolSize
=1
),采用new LinkedBlockingQueue()
队列(容量很大),不会创建 ”临时工“ 非核心线程,所有任务按照先来先执行
的顺序执行,,如果此线程不处于空闲中,则新来的任务会存储在阻塞队列中等待执行。
代码示例如下:
public void testSingleThreadExecutor() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}
5、newScheduledThreadPool 【常用】
代码示例如下:
public void testScheduledThreadPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}
6、newSingleThreadScheduledExecutor
代码示例如下:
public void testSingleThreadScheduledExecutor() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newSingleThreadScheduledExecutor();
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}
7、newWorkStealingPool
代码示例如下:
public void testWorkStealingPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newWorkStealingPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}
以上线程均可使用简洁方式
public void simpleThread() {
ExecutorService service = Executors.newFixedThreadPool(5);
service.execute(() -> {
// 具体的业务逻辑
// ....
// .....
});
}
究竟选用哪种线程池呢?
该如何选择线程池处理并发呢?阿里巴巴《Java开发手册》提供的一套规定:
1、
【强制】
:获取单例对象需要保证线程安全,其中的方法也要保证线程安全。
说明:资源驱动类、工具类、单例工厂类都需要注意。
2、
【强制】
创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
说明:如上述手动创建线程池的工厂线程名称定义:
new ThreadFactoryBuilder().setNameFormat("- 高高手动创建的线程池-%d").build()
3、
【强制】
线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明: 线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。
如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
4、
【强制】
线程池不允许使用Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:
Executors
返回的线程池对象的弊端如下:
1)FixedThreadPool
和SingleThreadPool
:
允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致OOM
。
2)CachedThreadPool
:
允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM
。
5、
【强制】
必须回收自定义的 ThreadLocal 变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的 ThreadLocal 变量,可能会影响后续业务逻辑和造成内存泄露等问题。
尽量在代理中使用
try-finally
块进行回收。
objectThreadLocal.set(userInfo);
try {
// ...
} finally {
objectThreadLocal.remove();
}
6、
【强制】
高并发时,同步调用应该去考量锁的性能损耗。能用无锁数据结构,就不要用锁;能锁区块,就不要锁整个方法体;能用对象锁,就不要用类锁。
说明:尽可能使加锁的代码块工作量尽可能的小,避免在锁代码块中调用
RPC
方法。
7、
【强制】
对多个资源、数据库表、对象同时加锁时,需要保持一致的加锁顺序,否则可能会造成死锁。
说明:线程一需要对表 A、B、C 依次全部加锁后才可以进行更新操作,那么线程二的加锁顺序也必须是 A、B、C,否则可能出现死锁。
8、
【强制】
在使用阻塞等待获取锁的方式中,必须在try
代码块之外,并且在加锁方法与try
代码块之间没有任何可能抛出异常的方法调用,避免加锁成功后,在finally
中无法解锁。
说明一:如果在
lock
方法与try
代码块之间的方法调用抛出异常,那么无法解锁,造成其它线程无法成功获取锁。
·
说明二:如果lock
方法在try
代码块之内,可能由于其它方法抛出异常,导致在finally
代码块中,unlock
对未加锁的对象解锁,它会调用AQS
的tryRelease
方法(取决于具体实现类),抛出IllegalMonitorStateException
异常。
·
说明三:在Lock
对象的lock
方法实现中可能抛出unchecked
异常,产生的后果与说明二相同。
正例:
Lock lock = new XxxLock();
// ...
lock.lock();
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}
反例:
Lock lock = new XxxLock();
// ...
try {
// 如果此处抛出异常,则直接执行 finally 代码块
doSomething();
// 无论加锁是否成功,finally 代码块都会执行
lock.lock();
doOthers();
} finally {
lock.unlock();
}
9、
【强制】
在使用尝试机制来获取锁的方式中,进入业务代码块之前,必须先判断当前线程是否持有锁。锁的释放规则与锁的阻塞等待方式相同。
说明:Lock 对象的 unlock 方法在执行时,它会调用 AQS 的 tryRelease 方法(取决于具体实现类),如果当前线程不持有锁,则抛出 IllegalMonitorStateException 异常。
正例:
Lock lock = new XxxLock();
// ...
boolean isLocked = lock.tryLock();
if (isLocked) {
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}
}
10、
【强制】
并发修改同一记录时,避免更新丢失,需要加锁。要么在应用层加锁,要么在缓存加锁,要么在数据库层使用乐观锁,使用version
作为更新依据。
说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于3 次。
11、
【强制】
多线程并行处理定时任务时,Timer
运行多个TimeTask
时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService
则没有这个问题。
12、
【推荐】
使用CountDownLatch
进行异步转同步操作,每个线程退出前必须调用countDown
方法,线程执行代码注意catch
异常,确保countDown
方法被执行到,避免主线程无法执行至await
方法,直到超时才返回结果。
说明:注意,子线程抛出异常堆栈,不能在主线程
try-catch
到。
13、
【推荐】
避免Random
实例被多线程使用,虽然共享该实例是线程安全的,但会因竞争同一seed
导致的性能下降。
说明:
Random
实例包括java.util.Random
的实例或者Math.random()
的方式。
正例:在
JDK7
之后,可以直接使用API ThreadLocalRandom
,而在JDK7
之前,需要编码保证每个线程持有一个单独的Random
实例。
14、【参考】
HashMap
在容量不够进行resize
时由于高并发可能出现死链,导致CPU
飙升,在开发过程中注意规避此风险。
15、【参考】
ThreadLocal
对象使用static
修饰,ThreadLocal
无法解决共享对象的更新问题。
说明:这个变量是针对一个线程内所有操作共享的,所以设置为静态变量,所有此类实例共享此静态变量,也就是说在类第一次被使用时装载,只分配一块存储空间,所有此类的对象(只要是这个线程内定义的)都可以操控这个变量。
其他文章参考资料:
更多推荐
所有评论(0)