作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】   

大家有没有发现,其实 「 一文秒懂 」 系列讲述的都是多线程并发开发的问题。这个话题太大了,估计没有上百篇文章都解释不清楚。

本文,我们来讲解下 Java 并发中的基础的基础,核心的核心,Java 并发编程中的最基本的机制之一 - 「 线程同步 」

为了方便你理解并发编程中的各种概念和术语,我们首先会来一阵扫盲,讨论一些基本的并发相关术语和方法。接着,我们将开发一个简单的应用程序,并在合格应用程序里处理并发问题,以方便大家理解和巩固 wait() 和 notify()

Java 中的线程同步 ( Thread Synchronization )

在并发编程中,在多线程环境下,多个线程可能会尝试修改同一资源。如果线程管理不当,这显然会导致一致性问题。

Java 中的哨兵块 ( guarded block )

Java 中,可以用来协调多个线程操作的一个工具是 「 哨兵块 」。这个哨兵块会在恢复执行前检查特定条件。

基于这种哨兵检查的思想,Java 在所有类的基类 Object 中提供了两个方法

方法说明
Object.wait()暂停一个线程
Object.notify()唤醒一个线程

是不是有点难以理解,别担心,看下面这个图,这个图描绘了线程的的生命周期。

虽然从上图中可以看出,有多个方法可以控制一个线程的生命周期,但本章节,我们只讨论 notify() 方法和 wait() 方法

wait() 方法

对照上图,简单的说,当我们调用 wait() 时会强制当前线程等待,直到某个其它线程在同一个对象上调用 notify() 或 notifyAll() 方法。

因此,当前线程必须拥有对象的监视器。根据 Java docs 的说法,这可能发生在

  • 我们已经为给定对象执行了同步实例方法
  • 我们已经在给定对象上执行了 synchronized 块的主体
  • 通过为 Class 类型的对象执行同步静态方法

请注意,一次只有一个活动线程可以拥有对象的监视器。

除了无参数 wait() 方法外,Java 还重载了另一个 wait() 方法

wait() 方法

wait() 方法导致当前线程无限期地等待,直到另一个线程调用此对象的 notify() 或 notifyAll() 方法

wait(long timeout) 方法

使用此方法,我们可以指定一个超时,在此之后将自动唤醒线程。

当然了,我们可以在到达超时之前使用 notify() 或 notifyAll() 提前唤醒线程。

请注意,调用 wait(0) 与调用 wait() 相同

wait(long timeout, int nanos)

这是与 wait(long timeout) 提供相同功能的签名,唯一的区别是我们可以提供更高的精度。

该方法计算超时之间的方式为:

    总超时时间(以纳秒为单位)= 1_000_000 * 超时 + nanos

notify() 或 notifyAll() 方法

notify() 和 notifyAll() 方法用于唤醒等待访问此对象监视器的线程。

它们以不同的方式通知等待线程。

notify() 方法

对于在此对象的监视器上等待的所有线程(通过使用任何一个重载 wait() 方法 ),notify() 通知将会随机唤醒任何一个线程。

也就是说,我们并不能确切知道唤醒了哪个线程,这取决于实现。

因为 notify() 提供了唤醒一个随机线程的机制,因此它可用于实现线程执行类似任务的互斥锁定。

但在大多数情况下,使用 notifyAll() 会是一个更可行的方案。

notifyAll() 方法

notifyAll() 方法用于唤醒正在此对象的监视器上等待的所有线程。唤醒的线程将以常规的方式完成 - 就像任何其他线程一样。

但,有一点要注意的是,对于任意一个线程,但在我们允许其继续执行之前,请始终快速检查继续执行该线程所需的条件。因为在某些情况下线程被唤醒而没有收到通知(这个场景将在后面的例子中讨论 )

发送者 - 接收者同步问题

线程同步的问题,我们已经有了个大概的了解,接下来,我们看一个简单的 Sender-Receiver ( 发送者 - 接收者 ) 应用程序,这个应用程序将利用wait() 和 notify() 方法建立它们之间的同步。

  • 发送者应该向接收者发送数据包
  • 在发送方完成发送之前,接收方无法处理数据包
  • 同样,发送方不得尝试发送另一个数据包,除非接收方已处理过上一个数据包

我们首先创建一个 Data 类,用于包含将从 Sender 发送到 Receiver 的数据包,同时,我们将使用 wait() 和 notifyAll() 来设置它们之间的同步。

    public class Data {
        private String packet;
    
        // True if receiver should wait
        // False if sender should wait
        private boolean transfer = true;
    
        public synchronized void send(String packet) {
            while (!transfer) {
                try { 
                    wait();
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
            transfer = false;
    
            this.packet = packet;
            notifyAll();
        }
    
        public synchronized String receive() {
            while (transfer) {
                try {
                    wait();
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
            transfer = true;
    
            notifyAll();
            return packet;
        }
    }

范例有点小长,我们一步一步分析下代码

  1. 私有属性 packet 用于表示通过网络传输的数据

  2. 布尔类型的私有属性 transfer 用于 Sender 和 Receiver 之间的同步

    • 如果此变量为 true,则 Receiver 应等待 Sender 发送消息
    • 如果它是 false ,那么 Sender 应该等待 Receiver 接收消息
  3. Sender 使用 send() 方法将数据发送给 Receiver:

    • 如果 transfer 为 false ,我们将在此线程上调用 wait()
    • 但如果它为 true ,我们需要切换状态,设置我们的消息并调用 notifyAll() 来唤醒其他线程以指定发生了重大事件,然后这些线程它们自己可以自查是否可以继续执行。
  4. 同样的, Receiver 将使用 receive() 方法接收数据

    • 如果 Sender 将传输设置为 false,那么继续,否则将在此线程上调用 wait()
    • 满足条件时,我们切换状态,通知所有等待的线程唤醒并返回 Receiver 的数据包

为什么在 while 循环中包含 wait()

由于 notify() 和 notifyAll() 随机唤醒正在此对象监视器上等待的线程,因此满足条件并不总是很重要。有时可能会发生线程被唤醒,但实际上并没有满足条件。

当然了,跟进一步说,我们还可以定义一个检查来避免虚假唤醒 - 线程可以从等待中醒来而不会收到通知。

我们为什么需要同步 send() 和 receive() 方法

我们将这些方法放在 synchronized 方法是为了提供内部锁。

如果调用 wait() 方法的线程不拥有固有锁,则会抛出错误。

现在,是时候创建 Sender 和 Receiver 并在两者上实现 Runnable 接口,以便它们的实例可以由线程执行。

我们先来看看 Sender 将如何工作

    public class Sender implements Runnable {
        private Data data;
    
        // standard constructors
    
        public void run() {
            String packets[] = {
              "First packet",
              "Second packet",
              "Third packet",
              "Fourth packet",
              "End"
            };
    
            for (String packet : packets) {
                data.send(packet);
    
                // Thread.sleep() to mimic heavy server-side processing
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
        }
    }

对于这个 Sender :

  • 我们正在创建一些随机数据包,这些数据包将通过网络以 packet[] 数组的形式发送
  • 对于每个数据包,我们只是调用 send() 而不做其它动作
  • 然后我们用随机时间间隔调用 Thread.sleep() 来模仿繁重的服务器端处理

接下来,我们来看看如何实现 Receiver

    public class Receiver implements Runnable {
        private Data load;
    
        // standard constructors
    
        public void run() {
            for(String receivedMessage = load.receive();
              !"End".equals(receivedMessage);
              receivedMessage = load.receive()) {
    
                System.out.println(receivedMessage);
    
                // ...
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
        }
    }

上面这段代码很简单,只是在循环中调用 load.receive() ,直到我们得到最后一个 “End” 数据包。

最后,我们就可以写一个 main() 方法来运行它们了

    public static void main(String[] args) {
        Data data = new Data();
        Thread sender = new Thread(new Sender(data));
        Thread receiver = new Thread(new Receiver(data));
    
        sender.start();
        receiver.start();
    }

运行范例,输出结果如下

    First packet
    Second packet
    Third packet
    Fourth packet

完美!

我们在这里 - 我们以正确的顺序接收所有数据包,并成功建立了发送方和接收方之间的正确通信。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐