概念

为什么学

平时应用层开发用不到并发编程,或者用的较少,但是tomcat、以及消息队列以及一些框架的底层一定涉及并发编程。岁月静好,不过是有人负重前行。本篇记载有限,取材自黑马并发编程,详细笔记可参考:java并发编程大全

进程

是操作系统的应用对应一个进程,浏览器是一个进程,网易云音乐也是一个进程,支持多开的软件叫多进程,为了加载数据之内存

线程

一个进程中包含一个或者多个线程,线程可以访问内存中的共享变量

并发

就是指在一定时间内处理事情的能力叫并发,在单核cpu下,以任务调度器在一定时间内以时间片方式在多个线程下切换,总结就是宏观并行,微观串行。

并行

真正同时的执行两个线程叫并行,在多核cpu下,两个核心同时处理两个线程,但真正的场景下却是线程数远高于cpu核心数量,所以在运行的时候是 并发+并行同时存在

同步异步

方法调用者需要等待结果返回叫同步,方法调用者不需要等待结果返回,直接进行下一步操作叫异步

public class test {
    public static void main(String[] args) {
        FileReader.read(Constants.MP4_FULL_PATH); // 同步操作
        System.out.println("do other things");
    }
}
// 同步读取,读取文件为耗时io操作,
// 日志打印为
// [mian] 读取文件
// [main] do other things
// 表示为主线程同步执行,do other things在读取文件之后输出
public class test {
    public static void main(String[] args) {
        new Thread(()->FileReader.read(Constants.MP4_FULL_PATH)).start(); // 异步操作
        System.out.println("do other things");
    }
}
// 日志打印为
// [mian] do other things
// [thread-0] 读取文件
// 主线程进来,重新开启了一个线程读取文件,do other things 并没有等待读取文件结束继续执行,这种情况为异步

在java中,只能使用多线程的方法让方法执行变为异步执行,tomcat的异步servlet也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞tomcat工作线程

线程应用

提高效率,充分利用多核cpu的优势并行跑多个线程执行,提高运行效率,只有多核cpu才是提高效率,单核cpu依然是并发轮询,并不是所有计算任务都能拆分,参考[阿姆达尔定律]

io操作不占用cpu,只是我们使用的一般是[阻塞io],这是线程虽然不占用cpu,但需要等待io结束,没能充分利用线程,所以有[非阻塞io]以及[异步io]的优化

线程

创建线程

1、继承Thread类

2、实现Runnable接口

3、实现callable接口(有返回值)

4、线程池创建线程

这里只介绍后两种创建方法。

讨论一下 Thread与Runnable之间的关系

第一种方式创建线程,覆盖了run方法,直接就是一个线程+任务的形式,将线程和要执行的任务合并到一起啦,第二种方式就是将线程和任务分开啦 new Thread(Runnable),将Runnable任务传进去,最终执行的都是run方法,更推荐使用第二种方式,更容易与一些线程池的高级api结合,让任务脱离了Thread的继承体系,更加灵活

callable创建线程

FutureTask本质是实现了Runnable接口和Future接口(用来返回结果的)

Future接口定义获取返回值,获取状态等

Runnable接口线程接口

RunnableFuture实现了 Runnable和 Future

Runnable、Callable、Future以及RunnableFuture都是接口,真正干活还得靠FutureTask

FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable与Future接口,因此FutureTask既可以作为一个Runnable被Thread执行,也可以获取到Future异步计算的结果

FutureTask构造方法可以传入Runnable和Callable实现类,在线程池中做为一个任务提交submit

Future机制还是捉襟见肘,比如任务的串行执行,并行执行等,所以有了CompletableFuture的出现

CompletableFuture 可以依赖上次的执行结果,执行下次串行化,也可合并两次执行结果,完成之后进行回调

具体请参考:什么,你还不会用CompletableFuture?

可以参考 Runnable、Future、Callable、FutureTask之间的关系

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务对象 Integer就是返回结果,传入的lamdba就是callable的call方法
        FutureTask<Integer> integerFutureTask = new FutureTask<>(() -> {
            return 100;
        });
        // 线程对象,传入任务对象
        new Thread(integerFutureTask,"taskDemo").start();
        // 主线程阻塞,同步等待task返回结果
        Integer integer = integerFutureTask.get();

        System.out.println("测试Callable接口");
    }
}

 线程运行原理

在java内存结构中,堆、栈、方法区,其中栈就是给线程用的,程序计数器存储本线程的调用位置,调用方式就是栈帧调用。类加载之后就会把class信息放到方法区,在由虚拟机创建线程,栈,程序计数器为线程私有,每个线程可以访问共享内存中的数据(线程安全)

线程上下文切换

因为以下原因导致cpu不在执行当前线程,转而执行另一个线程的代码,需要保存当前线程的状态,并且恢复另一个线程的状态为上下文切换

  • 线程的cpu时间片用完
  • 垃圾回收
  • 又更高优先级的线程需要运行
  • 线程自己调用了sleep、yield、wait、join、park(LockSupport.park() 锁的支持类方法,停止线程)、synchronized、lock等方法

线程常见方法

join  //等待线程结束 

setPriority(int) 设置优先级 最大10 默认5

isAlive 线程是否存活

isInterrupted() 判断是否被打断

interrupt() 打断线程,如果被打断的线程正在 sleep,wait,join 会导致被打断的线程抛出InterruptedException,并清除打断标记,如果打断真在运行的线程,则会设置打断标记,park线程被打断,也会设置打断标记

sleep(long n)放弃cpu使用权 睡觉

yield() 提示线程调度器让出cpu调度

sleep与yield的区别

slepp会让当前线程处于等待状态,其他线程可以打断正在睡眠的线程,会抛出异常,yield让当前线程从运行状态转为就绪状态,调度执行同优先级的线程,这时如果没有同优先级的线程,那么不能保证让当前线程停止的效果,Thread.sleep(10)在哪个线程里调用就是让哪个线程休眠,比如主线程等,其他方法也一致

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new Thread(()->{
            try {
                // 此线程睡2s
                Thread.sleep(2000);
            }catch (InterruptedException interruptedException){
                System.out.println("别烦我");
                interruptedException.printStackTrace();
            }
        }).start();
        // 主线程睡1s
        Thread.sleep(1000);
        // 主线程打断傻上边线程,抛出InterruptedException
        Thread.interrupted();
    }
}
public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable task1 = ()->{
            Thread.yield(); // 让出执行权,执行同级别优先级的线程 让别人先执行
            System.out.println("1");
        };
        Runnable task2 = ()->{
            
            System.out.println("2");
        };
        Thread t1 = new Thread(task1,"t1");
        Thread t2 = new Thread(task2,"t2");
        // 设置优先级
        t1.setPriority(Thread.MAX_PRIORITY);
        t2.setPriority(Thread.MIN_PRIORITY);

        t1.start();
        t2.start();
    }
}

join(等待线程结束

public class test {

    static int r = 0;
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      test1();
    }

    private static void test1() throws InterruptedException {
        System.out.println("开始");
        Thread t1 = new Thread(()->{
            System.out.println("开始");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束");
            r=10;
        },"t1");
        t1.start();
        t1.join();  // 等待t1线程结束 
        System.out.println(r);
        System.out.println("结束");
    }
}

如果没有t1.join(),打印结果为0,因为主线程直接打印0,t1线程赋值为10的时候,主线程已经打印完毕,这个时候,没有线程去获取r的值。

解决办法 join 等待t1线程结束之后在打印,sleep可以吗?,也可以做到,但是你不知道t1线程需要等待多长时间,无法具体给定睡眠时间,join是唯一的解决办法,等到线程结束。

join同步,加上join之后,主线程必须等待t1结束,就是同步等待

线程设计模式(两阶段终止模式interrupt)

在一个线程t1种如何优雅的终止线程t2,这里的优雅是给t2一个料理后事的机会,错误做法使用stop终止,如果线程持有锁,那么被杀死之后没有机会释放锁,其他线程也就永远无法得到锁

public class test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
        twoPhaseTermination.start();
        Thread.sleep(3500);
        twoPhaseTermination.stop();

    }


}
class TwoPhaseTermination{
    private Thread monitor;
    // 启动监控线程
    public void start(){
        monitor = new Thread(()->{
            while(true){
                // 获取当前线程 此处不能直接Thread.isInterrupted 不是静态方法,必须线程对象调用
                Thread current = Thread.currentThread();
                // 判断是否被打断
                if(current.isInterrupted()){
                    System.out.println("料理后事");
                    break;
                }
                try {
                    Thread.sleep(1000);
                    System.out.println("监控记录");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    // 重新设置打断标记 因为sleep被打断会清除打断标记
                    current.interrupt();
                }
            }
        });
        monitor.start();
    }
    // 停止监控线程
    public void stop(){
        monitor.interrupt();
    }
}

   输出结果监控记录监控记录监控记录料理后事

    主线程调用start 启动一个线程,轮询,如果当前线程被打断,则料理后事,如果没有被打断,则睡眠,执行监控记录,如果在睡眠中被打断,需要重新设置打断标记,让判断进入料理后事输出。

如果打断park被打断了 打断标记为真的话 不会被阻塞,就停不下来啦,需要手动设置false,Thread.interrupted()设置

  public static void main(String[] args) throws ExecutionException, InterruptedException {
       Thread t1 = new Thread(()->{
           System.out.println("park...");
           LockSupport.park();
           //重制打断标记
           Thread.interrupted();
           System.out.println("打断状态"+Thread.currentThread().isInterrupted());
       },"t1");

       t1.start();

       Thread.sleep(1000);
       // 打断t1
       t1.interrupt();


    }

守护线程

默认会等待所有非守护线程结束在结束,即使守护线程的代码没有执行完,也会强制结束,在java中垃圾回收就是守护线程

public class test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
       Thread t1 = new Thread(()->{
           System.out.println("...");
       },"t1");
       t1.setDaemon(true);
       t1.start();
       Thread.sleep(1000);
       // 打断t1
       t1.interrupt();
       
    }


}

线程状态

 线程安全

  在操作共享变量的时候,单线程没有问题,多线程的情况下,由于上下文切换导致指令错乱,变量操作并不是原子性操作,会有意想不到的结果产生

线程安全三大特性

原子性:原子性问题是一组cpu指令必须满足原子性,例如i++举例,分为三步,加载到寄存器,取值,赋值等,根本原因在于线程切换导致的问题

可见性: 可见性问题是某个线程对于线程共享变量的修改对其他线程可见,能及时的刷新回主内存,单核cpu没有可见性问题,因为用的一个cpu,根本原因在于cpu缓存导致的问题

有序性:有序性问题是针对代码,编译器会做出优化,也叫指令重排序,重排序遵从as-if-seris原则,保证单线程下的结果不变,对有依赖关系的共享变量不会做指令重排序,例如读后写等问题,根本原因在于指令重排序导致的问题,而不是线程切换,没有发生线程切换,指令重排序就可以导致严重的问题,比如单例模式

传送门:线程安全三大特性

解决线程安全三大特性:

synchronized 保证原子性,锁总线,保证可见性,强制刷新到主存,保证有序性,添加读写屏障

volatile保证可见性,刷新回主存,保证有序性,不被指令重排序,内存屏障,但是不保证读写的原子性,也可以换句话说,只保证单读,或者单写的原子性

cas保证原子性,compareAndSwap,底层与synchronized用的汇编指令一致,都是采用Lock 汇编指令

原子类等都是cas 加锁,以及用volatile保护资源

临界区

  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源
  • 多个线程读共享资源也是没有问题的
  • 多个线程对共享资源进行写操作时会发生指令交错

一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区

public class test {
    static int counter = 0;
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        incrment();
    }

    public static void incrment()
    // 此处为临界区 ,对共享资源进行读写操作
    {
        counter++;
    }

}

 竞态条件 Race Condition

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

访问临界区共享变量的解决方案

1、阻塞解决方案:synchronized(对象锁)加锁串行,Lock

2、非阻塞解决方案:原子变量

语法格式

   synchronized (对象)
        {
            //临界区
        }

synchronized 不会被上下文切换而打断,继续拥有锁,等待下次执行在继续执行,释放锁,唤醒其他线程竞争 获取锁,synchronized必须锁住的是同一个对象 

方法上的synchronized

public synchronized void incrment(){
        counter ++;
}
等价于
public void incrment(){
    synchronized(this){
        counter ++;    
    }       
}

public synchronized static void incrment(){
        counter ++;
}
等价于 静态没有this 在jvm中只有一个对象
public static void incrment(){
    synchronized (Test.class){
        counter ++;    
    }

}

成员变量和静态变量是否线程安全?

       静态变量不依赖特定的实例对象,而是被类的所有实例共享,位于方法区,所以线程不安全

  • 如果它们没有被共享,则线程安全
  • 如果它们被共享啦,根据他们的状态是否能够改变,又分为两种情况
  •  如果只有读操作,则线程安全
  •  如果有读写操作,这段代码是临界区,需要考虑线程安全

局部变量是否线程安全?

  • 局部变量是线程安全的
  • 但局部变量引用的对象则未必
  •         如果该对象没有逃离方法的作用访问,它是线程安全的
  •         如果该对象逃离方法的作用范围,需要考虑线程安全(对象逃逸)
public class test {
    List<String> a = new ArrayList<>();


    public  void method1(){
        a.add("1");
    }

    public void method2(){
        a.remove(0);
    }
}
// 这里的a 引用的都是同一个list对象,所以存在线程安全问题

private 和final限制子类,保证安全

public class test {


    public static void main(String[] args) throws ExecutionException, InterruptedException {

    }

    public void method1(){
        // 此时每个线程来都会创建一个list对象
        List<String> a = new ArrayList<>();
        method2(a);
        // 如果子类中重写了method3并且开启了多线程,那么传入的a就是同一个list,会造成多个线程访问一个变量,产生线程安全问题
        method3(a);
    }
    // 本方法中调用不会有问题,public也不会有问题,但是 public 如果有
    // 子类实现了 method1方法,并开启了一个线程,使用List 会造成两个线程争夺一个资源的情况
    // 所以 private 和final可以在一定程度上提供安全
    private  void method2(List<String> a){
        a.add("1");
    }

    private void method3(List<String> a){
        a.remove(0);
    }
}

 如果子类中重写了method3并且开启了多线程,那么传入的a就是同一个list,会造成多个线程访问一个变量,产生线程安全问题,不能控制子类行为,所以局部变量发生了逃逸,因为,用private和final禁止子类重写。

常见线程安全类

String 不可变类

Integer等包装类 不可变类

StringBuffer

Random

Vector

Hashtable

java.util.concurrent包下的所有类

这里说他们线程安全是指,多个线程调用它们同一个实例的某一个方法时,是线程安全的,说白了也就是锁的是同一个对象

每一个操作都是原子的,但是多个方法组合就不是线程安全的

举例:

Hashtable table = new Hashtable

if(table.get("key")!=null){

        // 两个线程都进入到这里,线程上下文切换,就产生了线程安全问题

        table.put("key",value);

}

public final class String不可变类,不可以修改能够保证线程安全,比如 subStirng方法,源码是创建了新的字符串

synchronized底层原理之Mionitor

Monitor

Monitor可以理解为一种同步工具,也可理解为一种同步机制,常常被描述为一个Java对象。Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor锁。(所以每个对象都能成为一把锁)

 Monitor是在jvm底层实现的,底层代码是c++。本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的转换,状态转换需要耗费很多的处理器时间成本非常高。所以synchronized是Java语言中的一个重量级操作。 
Monitor与java对象以及线程是如何关联 ?

  • 如果一个java对象被某个线程锁住,则该java对象的Mark Word字段中LockWord指向monitor的起始地址
  • Monitor的Owner字段会存放拥有相关联对象锁的线程id

Monitor被翻译为 监视器管程

每个java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置成指向Monitor对象(是操作系统的对象)的指针

Monitor对象中有属性 Owner 所有者, EntryList 阻塞队列,waitSet,等待区

当有线程,进入synchronized就会检测所有者Monitor,如果有所有者已经存在,则进入EntryList阻塞队列进行阻塞,不加synchronized不会关联Monitor,加了之后只对同一个对象有作用。

synchronized优化原理(jdk6之后)

 在Java早期版本中,synchronized属于重量级,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,挂起线程和恢复线程都需要转入内核态去完成,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成

锁升级过程无锁==>偏向锁==>轻量级锁==>重量级锁

偏向锁

  Synchronized用的锁是存在Java对象头里的MarkWord中,锁升级功能主要依赖MarkWord中锁标志位和释放偏向锁标志位

 在实际应用运行过程中发现,“锁总是同一个线程持有,很少发生竞争”,也就是说锁总是被第一个占用他的线程拥有,这个线程就是锁的偏向线程。

如果相等表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了,直到竞争发生才释放锁。以后每次同步,检查锁的偏向线程ID与当前线程ID是否一致,如果一致直接进入同步。无需每次加锁解锁都去CAS更新对象头。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高
     
 假如不一致意味着发生了竞争,锁已经不是总是偏向于同一个线程了,这时候可能需要升级变为轻量级锁,才能保证线程间公平竞争锁。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程是不会主动释放偏向锁的。

轻量级锁

轻量级锁的使用场景:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。

原理是:使用线程使用cas来更新对象头的上标识更新到自己线程之上,如果成功,则持有轻量级锁,如果失败,则cas自旋不断尝试(自旋次数是自适应的,根据同一个锁上一次自旋的时间和拥有锁线程的状态来决定),尝试一定次数,线程在尝试对一个对象加轻量级锁的过程中,如果CAS操作无法成功则表明有其他线程已经为此对象加上了轻量级锁,此时将进行锁膨胀,将轻量级锁变成重量级锁升级为重量级锁

锁消除,这个也很简单,就是在不存在锁竞争的地方使用了synchronized,jvm会自动帮你优化掉,比如在方法中new了一个对象,在加锁,该对象本就是线程私有的,不存在竞争,所以jvm会自动消除该锁。

轻量级锁对使用者是透明的,即语法仍然是synchronized,会优先用轻量级锁(cas替换Object的Mark word(对象头))加锁

假设有两个同步块,利用同一个对象加锁

   static final Object object = new Object();

    private  void method1(){
        // 添加锁,对象头中有标记01为轻量级锁,                  
       synchronized (object){
           // 同步块A
           method2();
       }

    }

    private void method2(){
       synchronized (object){
        // 同步块B
       }
    }

总的来说啊,JVM设计的这套synchronized锁升级的原则,主要是为了花费最小的代价能达到加锁的目的

比如在没有竞争的情况下,进入synchronized的使用使用偏向锁就够了,这样只需要第一次执行CAS操作获取锁,获取了偏向锁之后,后面每次进入synchronized同步代码块就不需要再次加锁了

然后在存在多个线程竞争锁的时候就不能使用偏向锁了,不能只偏心一个人,它优先获取锁,别人都看它表演,这样是不行的。于是就升级为轻量级锁

在轻量级锁模式在每次加锁和释放是都需要执行CAS操作,对比偏向锁来说性能低一点的,但是总体还是比较轻量级的。为了尽量提升线程获取锁的机会,避免线程陷入获取锁失败就立即沉睡的局面(线程沉睡再唤醒涉及上下文切换,用户态内核态切换,是一个非常重的操作,很费时间),所以设计自旋等待(默认10次);线程每次自旋一段时间之后再去重试获取锁。

当竞争非常激烈,并发很高,或者是synchronized代码块执行耗时比较长,就会积压大量的线程都在自旋,由于自旋是空耗费CPU资源的,也就是CPU在那等着,做不了其他事情,所以在尝试了最大的自旋次数之后;及时释放CPU资源,将线程挂起了。
park和unpark

它们都是LockSupport中的方法

// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先park 然后再unpark

Thread t1 = new Thread(() -> {
	log.debug("start...");
	sleep(1);
	log.debug("park...");
	LockSupport.park();
	log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

// 输出结果
// 18:42:52.585 c.TestParkUnpark [t1] - start...
//18:42:53.589 c.TestParkUnpark [t1] - park...
//18:42:54.583 c.TestParkUnpark [main] - unpark...
// 18:42:54.583 c.TestParkUnpark [t1] - resume...

先unpark在park

Thread t1 = new Thread(() -> {
	log.debug("start...");
	sleep(2);
	log.debug("park...");
	LockSupport.park();
	log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

// 输出结果
//18:43:50.765 c.TestParkUnpark [t1] - start...
//18:43:51.764 c.TestParkUnpark [main] - unpark...
//18:43:52.769 c.TestParkUnpark [t1] - park...
//18:43:52.769 c.TestParkUnpark [t1] - resume...

先unpark后park

在park()之前,先执行了unpark(),进而释放了一个许可,也就是说当前线程有一个可用的许可。而park()在有可用许可的情况下,是不会阻塞线程的。

与wait和notify对比

1、park和unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确

2、wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,也就是必须在同步块内使用,而 park,unpark 不必

3、可以先 unpark,而 wait & notify 不能先 notify

 多把锁(细粒度锁,互不相干的操作)

一间大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低
解决方法是准备多个房间(多个对象锁)

对整个房间加锁

class BigRoom {
	public void sleep() {
		synchronized (this) {
			log.debug("sleeping 2 小时");
			Sleeper.sleep(2);
		}
	}
	public void study() {
		synchronized (this) {
			log.debug("study 1 小时");
			Sleeper.sleep(1);
		}
	}
}


// 执行
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
	bigRoom.compute();
},"小南").start();
new Thread(() -> {
	bigRoom.sleep();
},"小女").start();

优化两个锁

class BigRoom {
	private final Object studyRoom = new Object();
	private final Object bedRoom = new Object();
	public void sleep() {
		synchronized (bedRoom) {
			log.debug("sleeping 2 小时");
			Sleeper.sleep(2);
		}
	}
	public void study() {
		synchronized (studyRoom) {
			log.debug("study 1 小时");
			Sleeper.sleep(1);
		}
	}
}

线程活跃性问题

死锁(互相持有对方的锁)、活锁(互相操作对方结束条件,永不结束)饥饿(线程被调用次数非常少)

一个线程需要同时获取多把锁,这时就容易发生死锁

Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
	synchronized (A) {
		log.debug("lock A");
		sleep(1);
		synchronized (B) {
			log.debug("lock B");
			log.debug("操作...");
		}
	}
}, "t1");
Thread t2 = new Thread(() -> {
	synchronized (B) {
		log.debug("lock B");
		sleep(0.5);
		synchronized (A) {
			log.debug("lock A");
			log.debug("操作...");
		}
	}
}, "t2");
t1.start();
t2.start();


// 12:22:06.962 [t2] c.TestDeadLock - lock B
// 12:22:06.962 [t1] c.TestDeadLock - lock A

定位死锁

检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:

cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
22816 KotlinCompileDaemon
33200 TestDeadLock // JVM 进程
11508 Main
28468 Launcher
cmd > jstack 33200
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
2018-12-29 05:51:40
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):
"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000003525000 nid=0x2f60 waiting on condition
[0x0000000000000000]
	java.lang.Thread.State: RUNNABLE
"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry
[0x000000001f54f000]
	java.lang.Thread.State: BLOCKED (on object monitor)
		at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
		- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
		- locked <0x000000076b5bf1d0> (a java.lang.Object)
		at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
		at java.lang.Thread.run(Thread.java:745)
"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001eb68800 nid=0x1b28 waiting for monitor entry
[0x000000001f44f000]
	java.lang.Thread.State: BLOCKED (on object monitor)
		at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
		- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
		- - locked <0x000000076b5bf1c0> (a java.lang.Object)
		at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
		at java.lang.Thread.run(Thread.java:745)
// 略去部分输出
Found one Java-level deadlock:
=============================
"Thread-1":
	waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
	which is held by "Thread-0"
"Thread-0":
	waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
	which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
	at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
	- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
	- locked <0x000000076b5bf1d0> (a java.lang.Object)
	at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:745)
"Thread-0":
	at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
	- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
	- locked <0x000000076b5bf1c0> (a java.lang.Object)
	at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束

public class TestLiveLock {
	static volatile int count = 10;
	static final Object lock = new Object();
	public static void main(String[] args) {
		new Thread(() -> {
			// 期望减到 0 退出循环
			while (count > 0) {
				sleep(0.2);
				count--;
				log.debug("count: {}", count);
			}
		}, "t1").start();
		new Thread(() -> {
			// 期望超过 20 退出循环
			while (count < 20) {
				sleep(0.2);
				count++;
				log.debug("count: {}", count);
			}
		}, "t2").start();
	}
}

饥饿:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,不易演示,略

ReentrantLock

可中断  
可以设置超时时间 
可以设置为公平锁  
支持多个条件变量

可重入

// 获取锁
reentrantLock.lock();
try {
    // 临界区
} finally {
    // 释放锁
    reentrantLock.unlock();
}

可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
	method1();
}
public static void method1() {
	lock.lock();
	try {
		log.debug("execute method1");
		method2();
	} finally {
		lock.unlock();
	}
}
public static void method2() {
	lock.lock();
	try {
		log.debug("execute method2");
		method3();
	} finally {
		lock.unlock();
	}
}
public static void method3() {
	lock.lock();
	try {
		log.debug("execute method3");
	} finally {
		lock.unlock();
	}
}

// 输出结果
//17:59:11.862 [main] c.TestReentrant - execute method1
//17:59:11.865 [main] c.TestReentrant - execute method2
//17:59:11.865 [main] c.TestReentrant - execute method3

可打断

ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
	log.debug("启动...");
	try {
		lock.lockInterruptibly(); //    设置可打断,如果是lock.lock()则不可打断
	} catch (InterruptedException e) {
		e.printStackTrace();
		log.debug("等锁的过程中被打断");
		return;
	}
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(1);
	t1.interrupt();
	log.debug("执行打断");
} finally {
	lock.unlock();
}

锁超时(试图等待锁)

ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
	log.debug("启动...");
	if (!lock.tryLock()) {
		log.debug("获取立刻失败,返回");
		return;
	}
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(2);
} finally {
	lock.unlock();
}

公平锁

ReentrantLock lock = new ReentrantLock(true);
公平锁一般没有必要,会降低并发度

条件变量 

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

await signal

static ReentrantLock lock = new ReentrantLock();
// 条件变量,相当于等待区
static Condition waitCigaretteQueue = lock.newCondition();
// 条件变量,相当于等待区
static Condition waitbreakfastQueue = lock.newCondition();

static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
	new Thread(() -> {
		try {
			lock.lock();
			while (!hasCigrette) {
				try {
					waitCigaretteQueue.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			log.debug("等到了它的烟");
		} finally {
			lock.unlock();
		}
}).start();
new Thread(() -> {
	try {
		lock.lock();
		while (!hasBreakfast) {
			try {
				waitbreakfastQueue.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		log.debug("等到了它的早餐");
	} finally {
		lock.unlock();
	}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
	lock.lock();
	try {
		log.debug("送烟来了");
		hasCigrette = true;
		waitCigaretteQueue.signal();
	} finally {
		lock.unlock();
	}
}
private static void sendBreakfast() {
	lock.lock();
	try {
		log.debug("送早餐来了");
		hasBreakfast = true;
		waitbreakfastQueue.signal();
	} finally {
		lock.unlock();
	}
}


// 输出结果
//18:52:27.680 [main] c.TestCondition - 送早餐来了
//18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
//18:52:28.683 [main] c.TestCondition - 送烟来了
//18:52:28.683 [Thread-0] c.TestCondition - 等到

共享模式内存

 java内存模型JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。
JMM 体现在以下几个方面:

原子性 - 保证指令不会受到线程上下文切换的影响,切换之后,数据可能出错 i++
可见性 - 保证指令不会受 cpu 缓存的影响  主内存中的数据与缓存中的数据是否一致
有序性 - 保证指令不会受 cpu 指令并行优化的影响 编译器优化顺序导致的问题

原子性问题

有序性问题

可见性问题

就是主内存中的数据和缓存区的数据不一致,导致的错误

voliate和synchronized和cas

synchronized 保证 原子性、可见性、有序性
voliate 保证可见性保证有序性,不被指令重排序,内存屏障volatile的语义不足以确保递增操作的原子性,除非你能确保只有一个线程对变量执行写操作。原子变量提供了“读-改-写”的原子操作,并且常常用做一种更好的volatile变量,
cas CompareAndSet (比较并交换)保证原子性
juc包下的原子类用 cas+voliate(无锁形态)保证了线程安全

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思

因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

public void withdraw(Integer amount) {
	while(true) {
		// 需要不断尝试,直到成功为止
		while (true) {
			// 比如拿到了旧值 1000
			int prev = balance.get();
			// 在这个基础上 1000-10 = 990
			int next = prev - amount;
			/*
			compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
			- 不一致了,next 作废,返回 false 表示失败
			比如,别的线程已经做了减法,当前值已经被减成了 990
			那么本线程的这次 990 就作废了,进入 while 下次循环重试
			- 一致,以 next 设置为新值,返回 true 表示成功
				*/
			if (balance.compareAndSet(prev, next)) {
				break;
			}
		}
	}
}

原子引用以及ABA问题

 放入原子引用AtomicReference中的泛型就是要保证线程安全的数据

class DecimalAccountSafeCas implements DecimalAccount {
	AtomicReference<BigDecimal> ref;
	public DecimalAccountSafeCas(BigDecimal balance) {
		ref = new AtomicReference<>(balance);
	}
	@Override
	public BigDecimal getBalance() {
		return ref.get();
	}
	@Override
	public void withdraw(BigDecimal amount) {
		while (true) {
			BigDecimal prev = ref.get();
			BigDecimal next = prev.subtract(amount);
			if (ref.compareAndSet(prev, next)) {
				break;
			}
		}
	}
}

ABA问题

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
只要有其它线程动过了共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

AtomicStampedReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
	log.debug("main start...");
	// 获取值 A
	String prev = ref.getReference();
	// 获取版本号
	int stamp = ref.getStamp();
	log.debug("版本 {}", stamp);
	// 如果中间有其它线程干扰,发生了 ABA 现象
	other();
	sleep(1);
	// 尝试改为 C
	log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}

private static void other() {
	new Thread(() -> {
		log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
		ref.getStamp(), ref.getStamp() + 1));
		log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(0.5);

new Thread(() -> {
	log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",ref.getStamp(), ref.getStamp() + 1));
	log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}

ref.compareAndSet(prev, "C", stamp, stamp + 1) ,改过之后,版本号+1

AtomicMarkableReference

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

class GarbageBag {
	String desc;
	public GarbageBag(String desc) {
		this.desc = desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	@Override
	public String toString() {
		return super.toString() + " " + desc;
	}
}
@Slf4j
public class TestABAAtomicMarkableReference {
	public static void main(String[] args) throws InterruptedException {
		GarbageBag bag = new GarbageBag("装满了垃圾");
		// 参数2 mark 可以看作一个标记,表示垃圾袋满了
		AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
		log.debug("主线程 start...");
		GarbageBag prev = ref.getReference();
		log.debug(prev.toString());
		new Thread(() -> {
			log.debug("打扫卫生的线程 start...");
			bag.setDesc("空垃圾袋");
			while (!ref.compareAndSet(bag, bag, true, false)) {}
			log.debug(bag.toString());
		}).start();
		Thread.sleep(1000);
		log.debug("主线程想换一只新垃圾袋?");
		boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
		log.debug("换了么?" + success);
		log.debug(ref.getReference().toString());
	}
}

CAS的底层实现Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

public class UnsafeAccessor {
	static Unsafe unsafe;
	static {
		try {
			Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
			theUnsafe.setAccessible(true);
			unsafe = (Unsafe) theUnsafe.get(null);
		} catch (NoSuchFieldException | IllegalAccessException e) {
			throw new Error(e);
		}
	}
	static Unsafe getUnsafe() {
		return unsafe;
	}
}

Unsafe CAS 操作

@Data
class Student {
	volatile int id;
	volatile String name;
}
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);


// 输出结果 Student(id=20, name=张三)

不可变类和无状态类

如果一个对象在不能够修改其内部状态(属性)那么它就是线程安全的,因为不存在并发修改啊!这样的对象在Java 中有很多(String等包装类)
 

public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
	/** The value is used for character storage. */
	private final char value[];
	/** Cache the hash code for the string */
	private int hash; // Default to 0
	// ...
}

final 的使用,发现该类、类中所有属性都是 final 的

属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

保护性拷贝

使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是如何实现的,就以 substring 为例 

public String substring(int beginIndex) {
	if (beginIndex < 0) {
		throw new StringIndexOutOfBoundsException(beginIndex);
	}
	int subLen = value.length - beginIndex;
	if (subLen < 0) {
		throw new StringIndexOutOfBoundsException(subLen);
	}
    // 而是去新创建了一个字符串
	return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}

无状态

在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的,因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为无状态

线程池

java JDK JUC包下的线程池 ThreadPoolExecutor

 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名高三位接收新任务处理阻塞队列任务说明
RUNNING111YY
SHUTDOWN000NY
STOP001NN
TIDYING010--
TERMINATED011--

从数字上比较, TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

构造方法

public ThreadPoolExecutor(int corePoolSize,
						int maximumPoolSize,
						long keepAliveTime,
						TimeUnit unit,
						BlockingQueue<Runnable> workQueue,
						ThreadFactory threadFactory,
						RejectedExecutionHandler handler)

corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略

工作方式 

线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。

救急线程  =  最大线程数目 - 核心线程数目

生存时间和时间单位针对救急线程
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
拒绝策略

AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

线程池工具类Executors  

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,包括newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务

适用于任务量已知,相对耗时的任务

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
			60L, TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>());
}

核心线程为0,全部都是救急线程(60s 后可以回收)
救急线程可以无限制创建

整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货) 

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
	new Thread(() -> {
		try {
			log.debug("putting {} ", 1);
			integers.put(1);
			log.debug("{} putted...", 1);
			log.debug("putting...{} ", 2);
			integers.put(2);
			log.debug("{} putted...", 2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	},"t1").start();
	sleep(1);
	new Thread(() -> {
		try {
			log.debug("taking {}", 1);
			integers.take();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	},"t2").start();
	sleep(1);
	new Thread(() -> {
		try {
			log.debug("taking {}", 2);
			integers.take();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	},"t3").start();

输出结果,可以看到放不进去,只有线程来取值才放进去取出

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

newSingleThreadExecutor 

public static ExecutorService newSingleThreadExecutor() {
	return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
				0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>()));
}

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

提交任务 

// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

关闭线程池 

shutdown:不会接收新任务 - 但已提交任务会执行完 - 此方法不会阻塞调用线程的执行

shutdownNow:不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务

任务调度线程池

在没有引用任务调度线程池之前使用的是Timer实现任务的延时执行

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
	System.out.println("任务1,执行时间:" + new Date());
	try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
	System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

Fork/Join 线程池

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

 提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {
	int n;
	public AddTask1(int n) {
		this.n = n;
	}
	@Override
	public String toString() {
		return "{" + n + '}';
	}
	@Override
	protected Integer compute() {
		// 如果 n 已经为 1,可以求得结果了
		if (n == 1) {
			log.debug("join() {}", n);
			return n;
		}
		// 将任务进行拆分(fork)
		AddTask1 t1 = new AddTask1(n - 1);
		t1.fork();
		log.debug("fork() {} + {}", n, t1);
		// 合并(join)结果
		int result = n + t1.join();
		log.debug("join() {} + {} = {}", n, t1, result);
		return result;
	}
}

  ForkJoinPool 来执行

public static void main(String[] args) {
	ForkJoinPool pool = new ForkJoinPool(4);
	System.out.println(pool.invoke(new AddTask1(5)));
}

 // 输出结果
// [ForkJoinPool-1-worker-0] - fork() 2 + {1}
// [ForkJoinPool-1-worker-1] - fork() 5 + {4}
// [ForkJoinPool-1-worker-0] - join() 1
// [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
// [ForkJoinPool-1-worker-2] - fork() 4 + {3}
// [ForkJoinPool-1-worker-3] - fork() 3 + {2}
// [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
// [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
// [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15

J.U.C 

AQS原理

AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改:

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过protected类型的getState,setState,compareAndSetState进行操作:

//返回同步状态的当前值
protected final int getState() {
        return state;
}
 // 设置同步状态的值
protected final void setState(int newState) {
        state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

 AQS 对资源的共享方式

Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:

  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock

ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

自定义同步器时需要重写下面几个AQS提供的模板方法

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException,这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用

AQS内部依赖阻塞队列,当获取失败,加入队列,每个node都是一个线程任务,AQS同步组件提供了CAS方法,维护state 排它、获取等,是JUC的基石,包括四个同步工具类,以及锁

传送门:并发工具类详解

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐