java高级(并发编程)
,这个类在java.util.concurrent.locks包下面。
概念
为什么学
平时应用层开发用不到并发编程,或者用的较少,但是tomcat、以及消息队列以及一些框架的底层一定涉及并发编程。岁月静好,不过是有人负重前行。本篇记载有限,取材自黑马并发编程,详细笔记可参考:java并发编程大全
进程
是操作系统的应用对应一个进程,浏览器是一个进程,网易云音乐也是一个进程,支持多开的软件叫多进程,为了加载数据之内存
线程
一个进程中包含一个或者多个线程,线程可以访问内存中的共享变量
并发
就是指在一定时间内处理事情的能力叫并发,在单核cpu下,以任务调度器在一定时间内以时间片方式在多个线程下切换,总结就是宏观并行,微观串行。
并行
真正同时的执行两个线程叫并行,在多核cpu下,两个核心同时处理两个线程,但真正的场景下却是线程数远高于cpu核心数量,所以在运行的时候是 并发+并行同时存在。
同步异步
方法调用者需要等待结果返回叫同步,方法调用者不需要等待结果返回,直接进行下一步操作叫异步
public class test {
public static void main(String[] args) {
FileReader.read(Constants.MP4_FULL_PATH); // 同步操作
System.out.println("do other things");
}
}
// 同步读取,读取文件为耗时io操作,
// 日志打印为
// [mian] 读取文件
// [main] do other things
// 表示为主线程同步执行,do other things在读取文件之后输出
public class test {
public static void main(String[] args) {
new Thread(()->FileReader.read(Constants.MP4_FULL_PATH)).start(); // 异步操作
System.out.println("do other things");
}
}
// 日志打印为
// [mian] do other things
// [thread-0] 读取文件
// 主线程进来,重新开启了一个线程读取文件,do other things 并没有等待读取文件结束继续执行,这种情况为异步
在java中,只能使用多线程的方法让方法执行变为异步执行,tomcat的异步servlet也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞tomcat工作线程
线程应用
提高效率,充分利用多核cpu的优势并行跑多个线程执行,提高运行效率,只有多核cpu才是提高效率,单核cpu依然是并发轮询,并不是所有计算任务都能拆分,参考[阿姆达尔定律]
io操作不占用cpu,只是我们使用的一般是[阻塞io],这是线程虽然不占用cpu,但需要等待io结束,没能充分利用线程,所以有[非阻塞io]以及[异步io]的优化
线程
创建线程
1、继承Thread类
2、实现Runnable接口
3、实现callable接口(有返回值)
4、线程池创建线程
这里只介绍后两种创建方法。
讨论一下 Thread与Runnable之间的关系
第一种方式创建线程,覆盖了run方法,直接就是一个线程+任务的形式,将线程和要执行的任务合并到一起啦,第二种方式就是将线程和任务分开啦 new Thread(Runnable),将Runnable任务传进去,最终执行的都是run方法,更推荐使用第二种方式,更容易与一些线程池的高级api结合,让任务脱离了Thread的继承体系,更加灵活
callable创建线程
FutureTask本质是实现了Runnable接口和Future接口(用来返回结果的)
Future接口定义获取返回值,获取状态等
Runnable接口线程接口
RunnableFuture实现了 Runnable和 Future
Runnable、Callable、Future以及RunnableFuture都是接口,真正干活还得靠FutureTask
FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable与Future接口,因此FutureTask既可以作为一个Runnable被Thread执行,也可以获取到Future异步计算的结果
FutureTask构造方法可以传入Runnable和Callable实现类,在线程池中做为一个任务提交submit
Future机制还是捉襟见肘,比如任务的串行执行,并行执行等,所以有了CompletableFuture的出现
CompletableFuture 可以依赖上次的执行结果,执行下次串行化,也可合并两次执行结果,完成之后进行回调
可以参考 Runnable、Future、Callable、FutureTask之间的关系
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务对象 Integer就是返回结果,传入的lamdba就是callable的call方法
FutureTask<Integer> integerFutureTask = new FutureTask<>(() -> {
return 100;
});
// 线程对象,传入任务对象
new Thread(integerFutureTask,"taskDemo").start();
// 主线程阻塞,同步等待task返回结果
Integer integer = integerFutureTask.get();
System.out.println("测试Callable接口");
}
}
线程运行原理
在java内存结构中,堆、栈、方法区,其中栈就是给线程用的,程序计数器存储本线程的调用位置,调用方式就是栈帧调用。类加载之后就会把class信息放到方法区,在由虚拟机创建线程,栈,程序计数器为线程私有,每个线程可以访问共享内存中的数据(线程安全)
线程上下文切换
因为以下原因导致cpu不在执行当前线程,转而执行另一个线程的代码,需要保存当前线程的状态,并且恢复另一个线程的状态为上下文切换
- 线程的cpu时间片用完
- 垃圾回收
- 又更高优先级的线程需要运行
- 线程自己调用了sleep、yield、wait、join、park(LockSupport.park() 锁的支持类方法,停止线程)、synchronized、lock等方法
线程常见方法
join //等待线程结束
setPriority(int) 设置优先级 最大10 默认5
isAlive 线程是否存活
isInterrupted() 判断是否被打断
interrupt() 打断线程,如果被打断的线程正在 sleep,wait,join 会导致被打断的线程抛出InterruptedException,并清除打断标记,如果打断真在运行的线程,则会设置打断标记,park线程被打断,也会设置打断标记
sleep(long n)放弃cpu使用权 睡觉
yield() 提示线程调度器让出cpu调度
sleep与yield的区别
slepp会让当前线程处于等待状态,其他线程可以打断正在睡眠的线程,会抛出异常,yield让当前线程从运行状态转为就绪状态,调度执行同优先级的线程,这时如果没有同优先级的线程,那么不能保证让当前线程停止的效果,Thread.sleep(10)在哪个线程里调用就是让哪个线程休眠,比如主线程等,其他方法也一致
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new Thread(()->{
try {
// 此线程睡2s
Thread.sleep(2000);
}catch (InterruptedException interruptedException){
System.out.println("别烦我");
interruptedException.printStackTrace();
}
}).start();
// 主线程睡1s
Thread.sleep(1000);
// 主线程打断傻上边线程,抛出InterruptedException
Thread.interrupted();
}
}
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable task1 = ()->{
Thread.yield(); // 让出执行权,执行同级别优先级的线程 让别人先执行
System.out.println("1");
};
Runnable task2 = ()->{
System.out.println("2");
};
Thread t1 = new Thread(task1,"t1");
Thread t2 = new Thread(task2,"t2");
// 设置优先级
t1.setPriority(Thread.MAX_PRIORITY);
t2.setPriority(Thread.MIN_PRIORITY);
t1.start();
t2.start();
}
}
join(等待线程结束)
public class test {
static int r = 0;
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
}
private static void test1() throws InterruptedException {
System.out.println("开始");
Thread t1 = new Thread(()->{
System.out.println("开始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束");
r=10;
},"t1");
t1.start();
t1.join(); // 等待t1线程结束
System.out.println(r);
System.out.println("结束");
}
}
如果没有t1.join(),打印结果为0,因为主线程直接打印0,t1线程赋值为10的时候,主线程已经打印完毕,这个时候,没有线程去获取r的值。
解决办法 join 等待t1线程结束之后在打印,sleep可以吗?,也可以做到,但是你不知道t1线程需要等待多长时间,无法具体给定睡眠时间,join是唯一的解决办法,等到线程结束。
join同步,加上join之后,主线程必须等待t1结束,就是同步等待
线程设计模式(两阶段终止模式interrupt)
在一个线程t1种如何优雅的终止线程t2,这里的优雅是给t2一个料理后事的机会,错误做法使用stop终止,如果线程持有锁,那么被杀死之后没有机会释放锁,其他线程也就永远无法得到锁
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
Thread.sleep(3500);
twoPhaseTermination.stop();
}
}
class TwoPhaseTermination{
private Thread monitor;
// 启动监控线程
public void start(){
monitor = new Thread(()->{
while(true){
// 获取当前线程 此处不能直接Thread.isInterrupted 不是静态方法,必须线程对象调用
Thread current = Thread.currentThread();
// 判断是否被打断
if(current.isInterrupted()){
System.out.println("料理后事");
break;
}
try {
Thread.sleep(1000);
System.out.println("监控记录");
} catch (InterruptedException e) {
e.printStackTrace();
// 重新设置打断标记 因为sleep被打断会清除打断标记
current.interrupt();
}
}
});
monitor.start();
}
// 停止监控线程
public void stop(){
monitor.interrupt();
}
}
输出结果监控记录监控记录监控记录料理后事
主线程调用start 启动一个线程,轮询,如果当前线程被打断,则料理后事,如果没有被打断,则睡眠,执行监控记录,如果在睡眠中被打断,需要重新设置打断标记,让判断进入料理后事输出。
如果打断park被打断了 打断标记为真的话 不会被阻塞,就停不下来啦,需要手动设置false,Thread.interrupted()设置
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(()->{
System.out.println("park...");
LockSupport.park();
//重制打断标记
Thread.interrupted();
System.out.println("打断状态"+Thread.currentThread().isInterrupted());
},"t1");
t1.start();
Thread.sleep(1000);
// 打断t1
t1.interrupt();
}
守护线程
默认会等待所有非守护线程结束在结束,即使守护线程的代码没有执行完,也会强制结束,在java中垃圾回收就是守护线程
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(()->{
System.out.println("...");
},"t1");
t1.setDaemon(true);
t1.start();
Thread.sleep(1000);
// 打断t1
t1.interrupt();
}
}
线程状态
线程安全
在操作共享变量的时候,单线程没有问题,多线程的情况下,由于上下文切换导致指令错乱,变量操作并不是原子性操作,会有意想不到的结果产生
线程安全三大特性
原子性:原子性问题是一组cpu指令必须满足原子性,例如i++举例,分为三步,加载到寄存器,取值,赋值等,根本原因在于线程切换导致的问题
可见性: 可见性问题是某个线程对于线程共享变量的修改对其他线程可见,能及时的刷新回主内存,单核cpu没有可见性问题,因为用的一个cpu,根本原因在于cpu缓存导致的问题
有序性:有序性问题是针对代码,编译器会做出优化,也叫指令重排序,重排序遵从as-if-seris原则,保证单线程下的结果不变,对有依赖关系的共享变量不会做指令重排序,例如读后写等问题,根本原因在于指令重排序导致的问题,而不是线程切换,没有发生线程切换,指令重排序就可以导致严重的问题,比如单例模式
传送门:线程安全三大特性
解决线程安全三大特性:
synchronized 保证原子性,锁总线,保证可见性,强制刷新到主存,保证有序性,添加读写屏障
volatile保证可见性,刷新回主存,保证有序性,不被指令重排序,内存屏障,但是不保证读写的原子性,也可以换句话说,只保证单读,或者单写的原子性
cas保证原子性,compareAndSwap,底层与synchronized用的汇编指令一致,都是采用Lock 汇编指令
原子类等都是cas 加锁,以及用volatile保护资源
临界区
- 一个程序运行多个线程本身是没有问题的
- 问题出在多个线程访问共享资源
- 多个线程读共享资源也是没有问题的
- 多个线程对共享资源进行写操作时会发生指令交错
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
public class test {
static int counter = 0;
public static void main(String[] args) throws ExecutionException, InterruptedException {
incrment();
}
public static void incrment()
// 此处为临界区 ,对共享资源进行读写操作
{
counter++;
}
}
竞态条件 Race Condition
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
访问临界区共享变量的解决方案
1、阻塞解决方案:synchronized(对象锁)加锁串行,Lock
2、非阻塞解决方案:原子变量
语法格式
synchronized (对象)
{
//临界区
}
synchronized 不会被上下文切换而打断,继续拥有锁,等待下次执行在继续执行,释放锁,唤醒其他线程竞争 获取锁,synchronized必须锁住的是同一个对象
方法上的synchronized
public synchronized void incrment(){
counter ++;
}
等价于
public void incrment(){
synchronized(this){
counter ++;
}
}
public synchronized static void incrment(){
counter ++;
}
等价于 静态没有this 在jvm中只有一个对象
public static void incrment(){
synchronized (Test.class){
counter ++;
}
}
成员变量和静态变量是否线程安全?
静态变量不依赖特定的实例对象,而是被类的所有实例共享,位于方法区,所以线程不安全
- 如果它们没有被共享,则线程安全
- 如果它们被共享啦,根据他们的状态是否能够改变,又分为两种情况
- 如果只有读操作,则线程安全
- 如果有读写操作,这段代码是临界区,需要考虑线程安全
局部变量是否线程安全?
- 局部变量是线程安全的
- 但局部变量引用的对象则未必
- 如果该对象没有逃离方法的作用访问,它是线程安全的
- 如果该对象逃离方法的作用范围,需要考虑线程安全(对象逃逸)
public class test {
List<String> a = new ArrayList<>();
public void method1(){
a.add("1");
}
public void method2(){
a.remove(0);
}
}
// 这里的a 引用的都是同一个list对象,所以存在线程安全问题
private 和final限制子类,保证安全
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
}
public void method1(){
// 此时每个线程来都会创建一个list对象
List<String> a = new ArrayList<>();
method2(a);
// 如果子类中重写了method3并且开启了多线程,那么传入的a就是同一个list,会造成多个线程访问一个变量,产生线程安全问题
method3(a);
}
// 本方法中调用不会有问题,public也不会有问题,但是 public 如果有
// 子类实现了 method1方法,并开启了一个线程,使用List 会造成两个线程争夺一个资源的情况
// 所以 private 和final可以在一定程度上提供安全
private void method2(List<String> a){
a.add("1");
}
private void method3(List<String> a){
a.remove(0);
}
}
如果子类中重写了method3并且开启了多线程,那么传入的a就是同一个list,会造成多个线程访问一个变量,产生线程安全问题,不能控制子类行为,所以局部变量发生了逃逸,因为,用private和final禁止子类重写。
常见线程安全类
String 不可变类
Integer等包装类 不可变类
StringBuffer
Random
Vector
Hashtable
java.util.concurrent包下的所有类
这里说他们线程安全是指,多个线程调用它们同一个实例的某一个方法时,是线程安全的,说白了也就是锁的是同一个对象
每一个操作都是原子的,但是多个方法组合就不是线程安全的
举例:
Hashtable table = new Hashtable
if(table.get("key")!=null){
// 两个线程都进入到这里,线程上下文切换,就产生了线程安全问题
table.put("key",value);
}
public final class String不可变类,不可以修改能够保证线程安全,比如 subStirng方法,源码是创建了新的字符串
synchronized底层原理之Mionitor
Monitor
Monitor可以理解为一种同步工具,也可理解为一种同步机制,常常被描述为一个Java对象。Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor锁。(所以每个对象都能成为一把锁)
Monitor是在jvm底层实现的,底层代码是c++。本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的转换,状态转换需要耗费很多的处理器时间成本非常高。所以synchronized是Java语言中的一个重量级操作。
Monitor与java对象以及线程是如何关联 ?
- 如果一个java对象被某个线程锁住,则该java对象的Mark Word字段中LockWord指向monitor的起始地址
- Monitor的Owner字段会存放拥有相关联对象锁的线程id
Monitor被翻译为 监视器或管程
每个java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置成指向Monitor对象(是操作系统的对象)的指针
Monitor对象中有属性 Owner 所有者, EntryList 阻塞队列,waitSet,等待区
当有线程,进入synchronized就会检测所有者Monitor,如果有所有者已经存在,则进入EntryList阻塞队列进行阻塞,不加synchronized不会关联Monitor,加了之后只对同一个对象有作用。
synchronized优化原理(jdk6之后)
在Java早期版本中,synchronized属于重量级,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,挂起线程和恢复线程都需要转入内核态去完成,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成
锁升级过程: 无锁==>偏向锁==>轻量级锁==>重量级锁
偏向锁
Synchronized用的锁是存在Java对象头里的MarkWord中,锁升级功能主要依赖MarkWord中锁标志位和释放偏向锁标志位
在实际应用运行过程中发现,“锁总是同一个线程持有,很少发生竞争”,也就是说锁总是被第一个占用他的线程拥有,这个线程就是锁的偏向线程。
如果相等表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了,直到竞争发生才释放锁。以后每次同步,检查锁的偏向线程ID与当前线程ID是否一致,如果一致直接进入同步。无需每次加锁解锁都去CAS更新对象头。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。
假如不一致意味着发生了竞争,锁已经不是总是偏向于同一个线程了,这时候可能需要升级变为轻量级锁,才能保证线程间公平竞争锁。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程是不会主动释放偏向锁的。
轻量级锁
轻量级锁的使用场景:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。
原理是:使用线程使用cas来更新对象头的上标识更新到自己线程之上,如果成功,则持有轻量级锁,如果失败,则cas自旋不断尝试(自旋次数是自适应的,根据同一个锁上一次自旋的时间和拥有锁线程的状态来决定),尝试一定次数,线程在尝试对一个对象加轻量级锁的过程中,如果CAS操作无法成功则表明有其他线程已经为此对象加上了轻量级锁,此时将进行锁膨胀,将轻量级锁变成重量级锁升级为重量级锁
锁消除,这个也很简单,就是在不存在锁竞争的地方使用了synchronized,jvm会自动帮你优化掉,比如在方法中new了一个对象,在加锁,该对象本就是线程私有的,不存在竞争,所以jvm会自动消除该锁。
轻量级锁对使用者是透明的,即语法仍然是synchronized,会优先用轻量级锁(cas替换Object的Mark word(对象头))加锁
假设有两个同步块,利用同一个对象加锁
static final Object object = new Object();
private void method1(){
// 添加锁,对象头中有标记01为轻量级锁,
synchronized (object){
// 同步块A
method2();
}
}
private void method2(){
synchronized (object){
// 同步块B
}
}
总的来说啊,JVM设计的这套synchronized锁升级的原则,主要是为了花费最小的代价能达到加锁的目的;
比如在没有竞争的情况下,进入synchronized的使用使用偏向锁就够了,这样只需要第一次执行CAS操作获取锁,获取了偏向锁之后,后面每次进入synchronized同步代码块就不需要再次加锁了
然后在存在多个线程竞争锁的时候就不能使用偏向锁了,不能只偏心一个人,它优先获取锁,别人都看它表演,这样是不行的。于是就升级为轻量级锁
在轻量级锁模式在每次加锁和释放是都需要执行CAS操作,对比偏向锁来说性能低一点的,但是总体还是比较轻量级的。为了尽量提升线程获取锁的机会,避免线程陷入获取锁失败就立即沉睡的局面(线程沉睡再唤醒涉及上下文切换,用户态内核态切换,是一个非常重的操作,很费时间),所以设计自旋等待(默认10次);线程每次自旋一段时间之后再去重试获取锁。
当竞争非常激烈,并发很高,或者是synchronized代码块执行耗时比较长,就会积压大量的线程都在自旋,由于自旋是空耗费CPU资源的,也就是CPU在那等着,做不了其他事情,所以在尝试了最大的自旋次数之后;及时释放CPU资源,将线程挂起了。
park和unpark
它们都是LockSupport中的方法
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
先park 然后再unpark
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
// 输出结果
// 18:42:52.585 c.TestParkUnpark [t1] - start...
//18:42:53.589 c.TestParkUnpark [t1] - park...
//18:42:54.583 c.TestParkUnpark [main] - unpark...
// 18:42:54.583 c.TestParkUnpark [t1] - resume...
先unpark在park
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
// 输出结果
//18:43:50.765 c.TestParkUnpark [t1] - start...
//18:43:51.764 c.TestParkUnpark [main] - unpark...
//18:43:52.769 c.TestParkUnpark [t1] - park...
//18:43:52.769 c.TestParkUnpark [t1] - resume...
先unpark后park
在park()之前,先执行了unpark(),进而释放了一个许可,也就是说当前线程有一个可用的许可。而park()在有可用许可的情况下,是不会阻塞线程的。
与wait和notify对比
1、park和unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确
2、wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,也就是必须在同步块内使用,而 park,unpark 不必
3、可以先 unpark,而 wait & notify 不能先 notify
多把锁(细粒度锁,互不相干的操作)
一间大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低
解决方法是准备多个房间(多个对象锁)
对整个房间加锁
class BigRoom {
public void sleep() {
synchronized (this) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (this) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
// 执行
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.compute();
},"小南").start();
new Thread(() -> {
bigRoom.sleep();
},"小女").start();
优化两个锁
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
线程活跃性问题
死锁(互相持有对方的锁)、活锁(互相操作对方结束条件,永不结束)饥饿(线程被调用次数非常少)
一个线程需要同时获取多把锁,这时就容易发生死锁
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
synchronized (A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();
// 12:22:06.962 [t2] c.TestDeadLock - lock B
// 12:22:06.962 [t1] c.TestDeadLock - lock A
定位死锁
检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:
cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
22816 KotlinCompileDaemon
33200 TestDeadLock // JVM 进程
11508 Main
28468 Launcher
cmd > jstack 33200
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
2018-12-29 05:51:40
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):
"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000003525000 nid=0x2f60 waiting on condition
[0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry
[0x000000001f54f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001eb68800 nid=0x1b28 waiting for monitor entry
[0x000000001f44f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- - locked <0x000000076b5bf1c0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
// 略去部分输出
Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Thread-0":
at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- locked <0x000000076b5bf1c0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
饥饿:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,不易演示,略
ReentrantLock
可中断
可以设置超时时间
可以设置为公平锁
支持多个条件变量可重入
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
// 输出结果
//17:59:11.862 [main] c.TestReentrant - execute method1
//17:59:11.865 [main] c.TestReentrant - execute method2
//17:59:11.865 [main] c.TestReentrant - execute method3
可打断
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly(); // 设置可打断,如果是lock.lock()则不可打断
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}
锁超时(试图等待锁)
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
公平锁
ReentrantLock lock = new ReentrantLock(true);
公平锁一般没有必要,会降低并发度
条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒await signal
static ReentrantLock lock = new ReentrantLock();
// 条件变量,相当于等待区
static Condition waitCigaretteQueue = lock.newCondition();
// 条件变量,相当于等待区
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
// 输出结果
//18:52:27.680 [main] c.TestCondition - 送早餐来了
//18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
//18:52:28.683 [main] c.TestCondition - 送烟来了
//18:52:28.683 [Thread-0] c.TestCondition - 等到
共享模式内存
java内存模型JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。
JMM 体现在以下几个方面:
原子性
- 保证指令不会受到线程上下文切换的影响,切换之后,数据可能出错 i++可见性
- 保证指令不会受 cpu 缓存的影响 主内存中的数据与缓存中的数据是否一致有序性
- 保证指令不会受 cpu 指令并行优化的影响 编译器优化顺序导致的问题
原子性问题
有序性问题
可见性问题
就是主内存中的数据和缓存区的数据不一致,导致的错误
voliate和synchronized和cas
synchronized 保证 原子性、可见性、有序性
voliate 保证可见性保证有序性,不被指令重排序,内存屏障volatile的语义不足以确保递增操作的原子性,除非你能确保只有一个线程对变量执行写操作。原子变量提供了“读-改-写”的原子操作,并且常常用做一种更好的volatile变量,
cas CompareAndSet (比较并交换)保证原子性
juc包下的原子类用 cas+voliate(无锁形态)保证了线程安全
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
public void withdraw(Integer amount) {
while(true) {
// 需要不断尝试,直到成功为止
while (true) {
// 比如拿到了旧值 1000
int prev = balance.get();
// 在这个基础上 1000-10 = 990
int next = prev - amount;
/*
compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
- 不一致了,next 作废,返回 false 表示失败
比如,别的线程已经做了减法,当前值已经被减成了 990
那么本线程的这次 990 就作废了,进入 while 下次循环重试
- 一致,以 next 设置为新值,返回 true 表示成功
*/
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
}
原子引用以及ABA问题
放入原子引用AtomicReference中的泛型就是要保证线程安全的数据
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
ABA问题
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
只要有其它线程动过了共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
sleep(1);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(0.5);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
ref.compareAndSet(prev, "C", stamp, stamp + 1) ,改过之后,版本号+1
AtomicMarkableReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
@Slf4j
public class TestABAAtomicMarkableReference {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
CAS的底层实现Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
Unsafe CAS 操作
@Data
class Student {
volatile int id;
volatile String name;
}
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);
// 输出结果 Student(id=20, name=张三)
不可变类和无状态类
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在Java 中有很多(String等包装类)
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}
final 的使用,发现该类、类中所有属性都是 final 的
属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
保护性拷贝
使用字符串时,也有一些跟修改相关的方法啊,比如 substring
等,那么下面就看一看这些方法是如何实现的,就以 substring
为例
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
// 而是去新创建了一个字符串
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
无状态
在 web 阶段学习时,设计 Servlet
时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的,因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为无状态
线程池
java JDK JUC包下的线程池 ThreadPoolExecutor
线程池状态
ThreadPoolExecutor
使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高三位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | |
STOP | 001 | N | N | |
TIDYING | 010 | - | - | |
TERMINATED | 011 | - | - |
从数字上比较, TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略
工作方式
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。救急线程 = 最大线程数目 - 核心线程数目
生存时间和时间单位针对救急线程
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
拒绝策略AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
线程池工具类Executors
根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,包括newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务适用于任务量已知,相对耗时的任务
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程为0,全部都是救急线程(60s 后可以回收)
救急线程可以无限制创建整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
队列采用了 SynchronousQueue
实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
输出结果,可以看到放不进去,只有线程来取值才放进去取出
11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改
对外暴露的是ThreadPoolExecutor
对象,可以强转后调用setCorePoolSize
等方法进行修改
提交任务
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
关闭线程池
shutdown:不会接收新任务 - 但已提交任务会执行完 - 此方法不会阻塞调用线程的执行
shutdownNow:不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务
任务调度线程池
在没有引用任务调度线程池之前使用的是Timer实现任务的延时执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
Fork/Join 线程池
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
提交给 Fork/Join
线程池的任务需要继承 RecursiveTask
(有返回值)或 RecursiveAction
(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务
@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);
// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}
ForkJoinPool
来执行
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}
// 输出结果
// [ForkJoinPool-1-worker-0] - fork() 2 + {1}
// [ForkJoinPool-1-worker-1] - fork() 5 + {4}
// [ForkJoinPool-1-worker-0] - join() 1
// [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
// [ForkJoinPool-1-worker-2] - fork() 4 + {3}
// [ForkJoinPool-1-worker-3] - fork() 3 + {2}
// [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
// [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
// [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15
J.U.C
AQS原理
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改:
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过protected类型的getState,setState,compareAndSetState进行操作:
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 对资源的共享方式
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
自定义同步器时需要重写下面几个AQS提供的模板方法
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException,这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用
AQS内部依赖阻塞队列,当获取失败,加入队列,每个node都是一个线程任务,AQS同步组件提供了CAS方法,维护state 排它、获取等,是JUC的基石,包括四个同步工具类,以及锁
传送门:并发工具类详解
更多推荐
所有评论(0)