当前位置: 首页 > news >正文

谈谈Java多线程离不开的AQS

如果你想深入研究Java并发的话,那么AQS一定是绕不开的一块知识点,Java并发包很多的同步工具类底层都是基于AQS来实现的,比如我们工作中经常用的Lock工具ReentrantLock、栅栏CountDownLatch、信号量Semaphore等,而且关于AQS的知识点也是面试中经常考察的内容,所以,无论是为了更好的使用还是为了应付面试,深入学习AQS都很有必要。

1. AQS简介

AQS初识

AQS,全名AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它是实现同步器的基础组件,如常用的ReentrantLock、Semaphore、CountDownLatch等。AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。

下面是AQS的组成结构。

AbstractQueuedSynchronizer主要由三部分组成,state同步状态、Node组成的CLH队列、ConditionObject条件变量(包含Node组成的条件单向队列),下面会分别对这三部分做介绍。

AbstractQueuedSynchronizer

先了解下AbstractQueuedSynchronizer提供的核心函数

AQS支持两种资源分享的方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

自定义的同步器继承AQS后,只需要实现共享资源state的获取和释放方式即可,其他如线程队列的维护(如获取资源失败入队/唤醒出队等)等操作,AQS在顶层已经实现了。

状态

  • getState():返回同步状态
  • setState(int newState):设置同步状态
  • compareAndSetState(int expect, int update):使用CAS设置同步状态
  • isHeldExclusively():当前线程是否持有资源

独占资源(不响应线程中断)

  • tryAcquire(int arg):独占式获取资源,子类实现
  • acquire(int arg):独占式获取资源模板
  • tryRelease(int arg):独占式释放资源,子类实现
  • release(int arg):独占式释放资源模板

共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
  • acquireShared(int arg):共享式获取资源模板
  • tryReleaseShared(int arg):共享式释放资源,子类实现
  • releaseShared(int arg):共享式释放资源模板

1.1 AQS的int变量

AQS中维护了一个用关键字volatile修饰同步状态变量state ,代表着该共享资源的状态一更改就能被所有线程可见,而AQS的加锁方式本质上就是多个线程通过CAS完成对state值的修改,当state为0时代表线程可以竞争锁,不为0时代表当前对象锁已经被占有。所以state的具体语义由实现者去定义,现有的ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch定义的state语义都不一样。

  • ReentrantLockstate用来表示是否有锁资源
  • ReentrantReadWriteLockstate16位代表读锁状态,低16位代表写锁状态
  • Semaphorestate用来表示可用信号的个数
  • CountDownLatchstate用来表示计数器的值

通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

1.2 AQS的CLH队列

CLHAQS内部维护的FIFO先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构。当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。所以说AQS通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:

  • 先进先出保证了公平性
  • 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁

CLH队列用一张原理图来表示大致如下:

image-20230210111957060

CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO

head 头结点又叫哨兵节点,线程thread为null

1.3 内部类Node

NodeAQS的内部类,每个等待资源的线程都会封装成Node节点组成CLH队列、等待队列,所以说Node是非常重要的部分,理解它是理解AQS的第一步。

内部结构

Node类中的变量都很好理解,只有waitStatus、nextWaiter没有细说,下面做个补充说明

waitStatus等待状态如下

waitStatus等待状态如下

nextWaiter特殊标记

  • NodeCLH队列时,nextWaiter表示共享式或独占式标记
  • Node在条件队列时,nextWaiter表示下个Node节点指针

2、AQS前置知识点

2.1、模板方法

AbstractQueuedSynchronizer是个抽象类,所有用到方法的类都要继承此类的若干方法,对应的设计模式就是模版模式

模版模式定义:一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行。这种类型的设计模式属于行为型模式。

2.2、LookSupport

LockSupport 是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,当然阻塞之后肯定得有唤醒的方法。常用方法如下:

public static void park(Object blocker); // 暂停当前线程
public static void parkNanos(Object blocker, long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(Object blocker, long deadline); // 暂停当前线程,直到某个时间
public static void park(); // 无期限暂停当前线程
public static void parkNanos(long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(long deadline); // 暂停当前线程,直到某个时间
public static void unpark(Thread thread); // 恢复当前线程
public static Object getBlocker(Thread t);

park是因为park英文意思为停车。我们如果把Thread看成一辆车的话,park就是让车停下,unpark就是让车启动然后跑起来。

与Object类的wait/notify机制相比,park/unpark有两个优点:

  1. thread为操作对象更符合阻塞线程的直观定义
  2. 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll 唤醒所有等待的线程),增加了灵活性。

park/unpark调用的是 Unsafe(提供CAS操作) 中的 native代码。

park/unpark 功能在Linux系统下是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。mutexcondition保护了一个 _counter 的变量,当 park 时,这个变量被设置为0。当unpark时,这个变量被设置为1。

2.3、CAS

CAS 是 CPU指令级别实现了原子性的比较和交换(Conmpare And Swap)操作,注意CAS不是锁只是CPU提供的一个原子性操作指令。图片

CAS在语言层面不进行任何处理,直接将原则操作实现在硬件级别实现,之所以可以实现硬件级别的操作核心是因为CAS操作类中有个核心类UnSafe类。

关于CAS引发的ABA问题、性能开销问题、只能保证一个共享变量之间的原则性操作问题,以前CAS中写过,在此不再重复讲解。

注意:并不是说 CAS 一定比SYN好,如果高并发执行时间久 ,用SYN好, 因为SYN底层用了wait() 阻塞后是不消耗CPU资源的。如果锁竞争不激烈说明自旋不严重,此时用CAS。

3. 条件变量

Objectwaitnotify函数是配合Synchronized锁实现线程间同步协作的功能,AQSConditionObject条件变量也提供这样的功能,通过ConditionObjectawaitsignal两类函数完成。

不同于Synchronized锁,一个AQS可以对应多个条件变量,而Synchronized只有一个。

如上图所示,ConditionObject内部维护着一个单向条件队列,不同于CHL队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在CHL队列, 条件队列出队的节点,会入队到CHL队列。

当某个线程执行了ConditionObjectawait函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObjectsignal函数,会将条件队列头部线程节点转移到CHL队列参与竞争资源,具体流程如下图

图片

最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prevnext变量都是null

4. 独占模式 源码讲解

讲完了AQS的一些基础定义,我们就可以开始学习同步的具体运行机制了,为了更好的演示,我们用ReentrantLock作为使用入口,一步步跟进源码探究AQS底层是如何运作的,这里说明一下,因为ReentrantLock底层调用的AQS是独占模式,所以下文讲解的AQS源码也是针对独占模式的操作

4.1 加锁过程

我们都知道,ReentrantLock的加锁和解锁方法分别为lock()和unLock(),我们先来看获取锁的方法。

final void lock() {if (compareAndSetState(0, 1))//设置持有锁线程setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}

逻辑很简单,线程进来后直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程,否则就调用acquire(1)再次尝试获取锁。

我们假定有两个线程A和B同时竞争锁,A进来先抢占到锁,此时的AQS模型图就类似这样:

继续走下面的方法

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

acquire包含了几个函数的调用,

  • tryAcquire:尝试直接获取锁,如果成功就直接返回;
  • addWaiter:将该线程加入等待队列FIFO的尾部,并标记为独占模式;
  • acquireQueued:线程阻塞在等待队列中获取锁,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • selfInterrupt:自我中断,就是既拿不到锁,又在等待时被中断了,线程就会进行自我中断selfInterrupt(),将中断补上。

我们一个个来看源码,并结合上面的两个线程来做场景分析。

4.1.1 tryAcquire

不用多说,就是为了再次尝试获取锁

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}

当线程B进来后,nonfairTryAcquire方法首先会获取state的值,如果为0,则正常获取该锁,不为0的话判断是否是当前线程占用了,是的话就累加state的值,这里的累加也是为了配合释放锁时候的次数,从而实现可重入锁的效果。

当然,因为之前锁已经被线程A占领了,所以这时候tryAcquire会返回false,继续下面的流程。

4.1.2 addWaiter ;抢锁失败,CLH入队

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}

这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待队列中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部,然后返回当前结点的前驱结点,

private Node enq(final Node node) {// CAS"自旋",直到成功加入队尾for (;;) {Node t = tail;if (t == null) {// 队列为空,初始化一个Node结点作为Head结点,并将tail结点也指向它if (compareAndSetHead(new Node()))tail = head;} else {// 把当前结点插入队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

第一遍循环时,tail指针为空,初始化一个Node结点,并把head和tail结点都指向它,然后第二次循环进来之后,tail结点不为空了,就将当前的结点加入到tail结点后面,也就是这样:

如果此时有另一个线程C进来的话,发现锁已经被A拿走了,然后队列里已经有了线程B,那么线程C就只能乖乖排到线程B的后面去,

图片

4.1.3 acquireQueued

一旦加入同步队列,就需要使用该方法,自旋阻塞 唤醒来不断的尝试获取锁,直到被中断或获取到锁。

接着解读方法,通过tryAcquire()和addWaiter(),我们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么处理线程呢?其实答案也很简单,能做的事无非两个:

1、循环让线程再抢资源。但仔细一推敲就知道不合理,因为如果有多个线程都参与的话,你抢我也抢只会降低系统性能

2、进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源

毫无疑问,选择2更加靠谱,acquireQueued方法做的也是这样的处理:

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 标记是否会被中断boolean interrupted = false;// CAS自旋for (;;) {// 获取当前结点的前结点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 获取锁失败,则将此线程对应的node的waitStatus改为CANCELcancelAcquire(node);}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)// 前驱结点等待状态为"SIGNAL",那么自己就可以安心等待被唤醒了return true;if (ws > 0) {/** 前驱结点被取消了,通过循环一直往前找,直到找到等待状态有效的结点(等待状态值小于等于0) ,* 然后排在他们的后边,至于那些被当前Node强制"靠后"的结点,因为已经被取消了,也没有引用链,* 就等着被GC了*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 如果前驱正常,那就把前驱的状态设置成SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}

acquireQueued方法的流程是这样的:

1、CAS自旋,先判断当前传入的Node的前结点是否为head结点,是的话就尝试获取锁,获取锁成功的话就把当前结点置为head,之前的head置为null(方便GC),然后返回

2、如果前驱结点不是head或者加锁失败的话,就调用shouldParkAfterFailedAcquire,将前驱节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程,parkAndCheckInterrupt在挂起线程后会判断线程是否被中断,如果被中断的话,就会重新跑acquireQueued方法的CAS自旋操作,直到获取资源。

ps:LockSupport.park方法会让当前线程进入waitting状态,在这种状态下,线程被唤醒的情况有两种,一是被unpark(),二是被interrupt(),所以,如果是第二种情况的话,需要返回被中断的标志,然后在acquire顶层方法的窗口那里自我中断补上

此时,因为线程A还未释放锁,所以线程B状态都是被挂起的

到这里,加锁的流程就分析完了,其实整体来说也并不复杂,而且当你理解了独占模式加锁的过程,后面释放锁和共享模式的运行机制也没什么难懂的了,所以整个加锁的过程还是有必要多消化下的,也是AQS的重中之重。

为了方便你们更加清晰理解,我加多一张流程图吧

AQS

4.2 释放锁

说完了加锁,我们来看看释放锁是怎么做的,AQS中释放锁的方法是release(),当调用该方法时会释放指定量的资源 (也就是锁) ,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

还是一步步看源码吧

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

4.2.1 tryRelease

代码上可以看出,核心的逻辑都在tryRelease方法中,该方法的作用是释放资源,AQS里该方法没有具体的实现,需要由自定义的同步器去实现,我们看下ReentrantLock代码中对应方法的源码:

protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}

tryRelease方法会减去state对应的值,如果state为0,也就是已经彻底释放资源,就返回true,并且把独占的线程置为null,否则返回false。

此时AQS中的数据就会变成这样:

图片

完全释放资源后,当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor()

private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)//将head结点的状态置为0compareAndSetWaitStatus(node, ws, 0);//找到下一个需要唤醒的结点sNode s = node.next;//如果为空或已取消if (s == null || s.waitStatus > 0) {s = null;// 从后向前,直到找到等待状态小于0的结点,前面说了,结点waitStatus小于0时才有效for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)s = t;}// 找到有效的结点,直接唤醒if (s != null)LockSupport.unpark(s.thread);//唤醒
}

方法的逻辑很简单,就是先将head的结点状态置为0,避免下面找结点的时候再找到head,然后找到队列中最前面的有效结点,然后唤醒,我们假设这个时候线程A已经释放锁,那么此时队列中排最前边竞争锁的线程B就会被唤醒。然后被唤醒的线程B就会尝试用CAS获取锁,回到acquireQueued方法的逻辑

for (;;) {// 获取当前结点的前结点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;
}

当线程B获取锁之后,会把当前结点赋值给head,然后原先的前驱结点 (也就是原来的head结点) 去掉引用链,方便回收,这样一来,线程B获取锁的整个过程就完成了,此时AQS的数据就会变成这样;

到这里,我们已经分析完了AQS独占模式下加锁和释放锁的过程,也就是tryAccquire->tryRelease这一链条的逻辑,除此之外,AQS中还支持共享模式的同步,这种模式下关于锁的操作核心其实就是tryAcquireShared->tryReleaseShared这两个方法,我们可以简单看下

5. 共享模式 源码讲解

5.1 获取锁

AQS中,共享模式获取锁的顶层入口方法是acquireShared,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源。

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);
}

该方法里包含了两个方法的调用,

tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。

doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。

5.1.1 tryAcquireShared

tryAcquireShared在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0代表获取成功,但没有剩余的资源,也就是state已经为0;
  • 正值代表获取成功,而且state还有剩余,其他线程可以继续领取

当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared方法

5.1.2 doAcquireShared

此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:

private void doAcquireShared(int arg) {// 加入队列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;// CAS自旋for (;;) {final Node p = node.predecessor();// 判断前驱结点是否是headif (p == head) {// 尝试获取一定数量的锁int r = tryAcquireShared(arg);if (r >= 0) {// 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程setHeadAndPropagate(node, r);// 让前驱结点去掉引用链,方便被GCp.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}private void setHeadAndPropagate(Node node, int propagate) {Node h = head;// head指向自己setHead(node);// 如果还有剩余量,继续唤醒下一个邻居线程if (propagate > 0 || h == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}

看到这里,你会不会一点熟悉的感觉,这个方法的逻辑怎么跟上面那个acquireQueued() 那么类似啊?对的,其实两个流程并没有太大的差别。只是doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完一定的资源后,发现还有剩余的资源,就继续唤醒下一个邻居线程,这才符合"共享"的思想嘛。

这里我们可以提出一个疑问,共享模式下,当前线程释放了一定数量的资源,但这部分资源满足不了下一个等待结点的需要的话,那么会怎么样?

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

接着往下说吧,唤醒下一个邻居线程的逻辑在doReleaseShared()中,我们放到下面的释放锁来解析。

5.2 释放锁

共享模式释放锁的顶层方法是releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

该方法同样包含两部分的逻辑:

tryReleaseShared:释放资源。

doAcquireShared:唤醒后继结点。

tryAcquireShared方法一样,tryReleaseShared在AQS中没有具体的实现,由子同步器自己去定义,但功能都一样,就是释放一定数量的资源。

释放完资源后,线程不会马上就收工,而是唤醒等待队列里最前排的等待结点。

5.2.1 doAcquireShared

唤醒后继结点的工作在doReleaseShared()方法中完成,我们可以看下它的源码:

private void doReleaseShared() {for (;;) {// 获取等待队列中的head结点Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// head结点waitStatus = -1,唤醒下一个结点对应的线程if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒后继结点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

代码没什么特别的,就是如果等待队列head结点的waitStatus为-1的话,就直接唤醒后继结点,唤醒的方法unparkSuccessor()在上面已经讲过了,这里也没必要再复述。

总的来看,AQS共享模式的运作流程和独占模式很相似,只要掌握了独占模式的流程运转,共享模式什么的不就那样吗,没难度。这也是我为什么共享模式讲解中不画流程图的原因,没必要嘛。

6. Condition

介绍完了AQS的核心功能,我们再扩展一个知识点,在AQS中,除了提供独占/共享模式的加锁/解锁功能,它还对外提供了关于Condition的一些操作方法。

Condition是个接口,在jdk1.5版本后设计的,基本的方法就是await()signal()方法,功能大概就对应Objectwait()notify(),Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现 ,AQS中就定义了一个类ConditionObject来实现了这个接口。

那么它应该怎么用呢?我们可以简单写个demo来看下效果

public class ConditionDemo {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();Thread tA = new Thread(() -> {lock.lock();try {System.out.println("线程A加锁成功");System.out.println("线程A执行await被挂起");condition.await();System.out.println("线程A被唤醒成功");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();System.out.println("线程A释放锁成功");}});Thread tB = new Thread(() -> {lock.lock();try {System.out.println("线程B加锁成功");condition.signal();System.out.println("线程B唤醒线程A");} finally {lock.unlock();System.out.println("线程B释放锁成功");}});tA.start();tB.start();}
}

执行main函数后结果输出为:

线程A加锁成功
线程A执行await被挂起
线程B加锁成功
线程B唤醒线程A
线程B释放锁成功
线程A被唤醒成功
线程A释放锁成功

代码执行的结果很容易理解,线程A先获取锁,然后调用await()方法挂起当前线程并释放锁,线程B这时候拿到锁,然后调用signal唤醒线程A。

毫无疑问,这两个方法让线程的状态发生了变化,我们仔细来研究一下,

翻看AQS的源码,我们会发现Condition中定义了两个属性firstWaiterlastWaiter,前面说了,AQS中包含了一个FIFO的CLH等待队列,每个Conditon对象就包含这样一个等待队列,而这两个属性分别表示的是等待队列中的首尾结点,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

注意:Condition当中的等待队列和AQS主体的同步等待队列是分开的,两个队列虽然结构体相同,但是作用域是分开的

6.1 await

先看await()的源码:

public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 将当前线程加入到等待队列中Node node = addConditionWaiter();// 完全释放占有的资源,并返回资源数int savedState = fullyRelease(node);int interruptMode = 0;// 循环判断当前结点是不是在Condition的队列中,是的话挂起while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

当一个线程调用Condition.await()方法,将会以当前线程构造结点,这个结点的waitStatus赋值为 Node.CONDITION,也就是-2,并将结点从尾部加入等待队列,然后尾部结点就会指向这个新增的结点。

private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}

我们依然用上面的demo来演示,此时,线程A获取锁并调用**Condition.await()**方法后,AQS内部的数据结构会变成这样:

在Condition队列中插入对应的结点后,线程A会释放所持有的资源,走到while循环那层逻辑

while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}

isOnSyncQueue方法的会判断当前的线程节点是不是在同步队列中,这个时候此结点还在Condition队列中,所以该方法返回false,这样的话循环会一直持续下去,线程被挂起,等待被唤醒,此时,线程A的流程暂时停止了。

当线程A调用await()方法挂起的时候,线程B获取到了线程A释放的资源,然后执行signal()方法:

6.2 signal

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}

先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。接着调用doSignal()方法来唤醒线程。

private void doSignal(Node first) {// 循环,从队列一直往后找不为空的首结点do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}final boolean transferForSignal(Node node) {// CAS循环,将结点的waitStatus改为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 上面已经分析过,此方法会把当前结点加入到等待队列中,并返回前驱结点Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}

doSignal的代码中可以看出,这时候程序寻找的是Condition等待队列中首结点firstWaiter的结点,此时该结点指向的是线程A的结点,所以之后的流程作用的都是线程A的结点。

这里分析下transferForSignal方法,先通过CAS自旋将结点waitStatus改为0,然后就把结点放入到同步队列 (此队列不是Condition的等待队列) 中,然后再用CAS将同步队列中该结点的前驱结点waitStatus改为Node.SIGNAL,也就是-1,此时AQS的数据结构大概如下(少画了个箭头,大家就当head结点是线程A结点的前驱结点就好):

回到await()方法,当线程A的结点被加入同步队列中时,isOnSyncQueue()会返回true,跳出循环,

while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);

接着执行acquireQueued()方法,这里就不用多说了吧,尝试重新获取锁,如果获取锁失败继续会被挂起,直到另外线程释放锁才被唤醒。

所以,当线程B释放完锁后,线程A被唤醒,继续尝试获取锁,至此流程结束。

对于这整个通信过程,我们可以画一张流程图展示下:

6.3 总结

说完了Condition的使用和底层运行机制,我们再来总结下它跟普通 wait/notify 的比较,一般这也是问的比较多的,Condition大概有以下两点优势:

  • Condition 需要结合 Lock 进行控制,使用的时候要注意一定要对应的unlock(),可以对多个不同条件进行控制,只要new 多个 Condition对象就可以为多个线程控制通信,wait/notify 只能和 synchronized 关键字一起使用,并且只能唤醒一个或者全部的等待队列;
  • Condition 有类似于 await 的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是 park/unpark 的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是 wait/notify 会产生先唤醒再挂起的死锁。

相关文章:

谈谈Java多线程离不开的AQS

如果你想深入研究Java并发的话&#xff0c;那么AQS一定是绕不开的一块知识点&#xff0c;Java并发包很多的同步工具类底层都是基于AQS来实现的&#xff0c;比如我们工作中经常用的Lock工具ReentrantLock、栅栏CountDownLatch、信号量Semaphore等&#xff0c;而且关于AQS的知识点…...

国际化语言,多语言三种方式

可以用透传的方式&#xff0c;自己写local的json文件&#xff0c;不需要配置什么&#xff0c;直接传&#xff0c;自己写方法i18n nextjsi18n umi4一、透传的方式 export const AppContext React.createContext<any>({})app.tsx 用context包裹import type { AppProps } f…...

C++——哈希3|位图

目录 常见哈希函数 位图 位图扩展题 位图的应用 常见哈希函数 1. 直接定址法--(常用) 这种方法不存在哈希冲突 取关键字的某个线性函数为散列地址&#xff1a;Hash&#xff08;Key&#xff09; A*Key B 优点&#xff1a;简单、均匀 缺点&#xff1a;需要事先知道关键字的…...

75 error

全部 答对 答错 选择题 3. 某公司非常倚重预测型方法交付项目&#xff0c;而其招聘的新项目经理却习惯于运用混合型方法。项目范围包含很多不清晰的需求。项目经理应该如何规划项目的交付&#xff1f; A company that is heavily focused on delivering projects using predi…...

ESP-C3入门8. 连接WiFi并打印信息

ESP-C3入门8. 连接WiFi并打印信息一、ESP32 连接WiFi的基本操作流程1. 初始化nvs存储2. 配置WiFi工作模式3. 设置WiFi登陆信息4. 启动WiFi5. 开启连接6. 判断是否成功二、事件处理函数1. 定义事件处理函数2. 创建事件组3. 在事件处理函数中设置事件组位4. 在其他任务中等待事件…...

使用python将EXCEL表格中数据转存到数据库

使用Python将excel表格中数据转存到数据库 1. 思路&#xff1a; 1&#xff09; 使用python读取excel表格中数据 2&#xff09;根据数据生成sql语句 3&#xff09;批量运行sql语句 2. 代码&#xff1a; import pandas as pddef readExcel(path, excel_file):return pd.read_e…...

【C++】类和对象(三)

目录 一、构造函数补充 1、初始化列表 1.1、初始化列表概念 1.2、初始化列表性质 2、explicit关键字 二、static成员 1、概念及使用 2、性质总结 三、友元 1、友元函数 2、友元类 四、内部类 五、拷贝对象时的一些编译器优化 一、构造函数补充 在《类和对象&#x…...

vTESTstudio - VT System CAPL Functions - General/Trigger Function

前面文章中我们已经介绍了常用的几种板卡的基本信息&#xff0c;那这些板卡该如何去通过软件调用呢&#xff1f;带着这个问题我们开始新的一块内容 - VT系统相关的自动化控制函数介绍&#xff0c;我会按照不同的板卡来分类&#xff0c;对其可控制的函数进行介绍&#xff0c;方便…...

IDEA 快捷键

ctrlD &#xff1a;复制当前行到下一行 ctrlO : 重写当前类的方法 ctrlshiftu : 大小写转化 Alt 上/下 &#xff1a;跳到上一个、下一个函数 Alt 左/右 : 回到上一个、下一个文件 Alt 回车 &#xff1a; 代码修正 Alt Insert &#xff1a; 插入代码 Ctrl Alt L &#xf…...

2023新华为OD机试题 - 入栈出栈(JavaScript) | 刷完必过

入栈出栈 题目 向一个空栈中依次存入正整数 假设入栈元素N(1 <= N <= 2^31-1) 按顺序依次为Nx ... N4、N3、N2、N1, 当元素入栈时,如果N1=N2+...Ny (y的范围[2,x],1 <= x <= 1000) 则N1到Ny全部元素出栈,重新入栈新元素M(M=2*N1) 如依次向栈存储6、1、2、3,当存…...

微信公众号扫码授权登录思路

引言 上学期研究了一下微信登录相关内容&#xff0c;也写了两三篇笔记&#xff0c;但是最后实际登录流程没有写&#xff0c;主要因为感觉功能完成有所欠缺&#xff0c;一直也没有好的思路&#xff1b;这两天我又看了看官方文档&#xff0c;重新构思了一下微信公众号登录相关的…...

数据结构与算法基础-学习-10-线性表之顺序栈的清理、销毁、压栈、弹栈

一、函数实现顺序栈的其他函数实现&#xff0c;请看之前的博客链接《数据结构与算法基础-学习-09-线性表之栈的理解、初始化顺序栈、判断顺序栈空、获取顺序栈长度的实现》。1、ClearSqStack&#xff08;1&#xff09;用途清理栈的空间。只需要栈顶指针和栈底指针相等&#xff…...

Hazel游戏引擎(005)

本人菜鸟&#xff0c;文中若有代码、术语等错误&#xff0c;欢迎指正 我写的项目地址&#xff1a;https://github.com/liujianjie/GameEngineLightWeight&#xff08;中文的注释适合中国人的你&#xff09; 文章目录前言关键操作代码文件关键代码代码流程代码文件关键代码exter…...

牛客网Python篇数据分析习题(四)

1.现有一个Nowcoder.csv文件&#xff0c;它记录了牛客网的部分用户数据&#xff0c;包含如下字段&#xff08;字段与字段之间以逗号间隔&#xff09;&#xff1a; Nowcoder_ID&#xff1a;用户ID Level&#xff1a;等级 Achievement_value&#xff1a;成就值 Num_of_exercise&a…...

盲盒如何创业?

所谓的“盲盒”&#xff0c;受众群体大部分是那些爱碰运气的人&#xff0c;顾客买的是那种在打开盲盒时一刹那的惊喜感和神秘感&#xff0c;在打开盲盒之前&#xff0c;谁也不知道自己会得到什么&#xff0c;这也是为什么消费者更愿意购买的原因。网上的盲盒&#xff0c;主要是…...

第1集丨Java中面向对象相关概念汇总

目录一、基本概念1.1 类1.2 属性1.3 方法1.4 静态1.5 包1.6 import二、高级概念2.1 构造方法2.2 继承2.3 super & this2.4 多态2.5 方法重载2.6 方法重写2.7 访问权限2.8 内部类2.9 final2.10 抽象2.11 接口2.12 匿名类面向对象的编程思想力图使计算机语言中对事物的描述与…...

高性能(二)

三、读写分离和分库分表 1.读写分离 1.1 概述 将数据库的读写操作分散到不同的数据库节点上 通常一主多从一台主数据库负责写&#xff0c;多台从数据库负责读。 主库和从库之间会进行数据同步&#xff0c;以保证从库中数据的准确性。 1.2 问题及解决 1.2.1 问题 主从同…...

Allegro如何实现同一个屏幕界面分屏显示操作指导

Allegro如何实现同一个屏幕界面分屏显示操作指导 在做PCB设计的时候,会需要分屏显示,比如一边是放大的视图,另外一边是缩小的视图,Allegro支持同一个屏幕界面下进行分屏显示,如下图 而且会实时同步起来 如何分屏,具体操作如下 点击View...

前后端一些下载与配置(第二篇 第10天过后)nuxt banner redis 短信服务

NUXT 应该是不用怎么装&#xff1f; 有现成的 axios 还需要在npm吗 好像已经有现成的了 banner banner 笔记汇总P396 Redis Linux安装redis tar -xzvf redis-6.2.6.tar.gz cd redis-6.2.6 照着他做 然后 cd /usr/local/redis/bin ./redis-server /usr/local/redis…...

OSG三维渲染引擎编程学习之四十八:“第五章:OSG场景渲染” 之 “5.6 多重纹理映射”

目录 第五章 OSG场景渲染 5.6 多重纹理映射 5.6.1 多重纹理映射介绍 5.6.2 多重纹理映射示例...

对Node.js 的理解?优缺点?应用场景?

一、是什么 Node.js 是一个开源与跨平台的 JavaScript 运行时环境 在浏览器外运行 V8 JavaScript 引擎&#xff08;Google Chrome 的内核&#xff09;&#xff0c;利用事件驱动、非阻塞和异步输入输出模型等技术提高性能 可以理解为 Node.js 就是一个服务器端的、非阻塞式I/…...

Bean的生命周期

所谓的生命周期指的是一个对象从诞生到销毁的整个生命过程&#xff0c;我们把这个过程就叫做一个对象的生命周期~~ Bean的生命周期分为以下五大部分&#xff1a; 实例化&#xff08;为 Bean 分配内存空间&#xff09; 设置属性&#xff08;Bean对象注入/装配&#xff09; 初…...

Python学习-----函数2.0(函数对象,名称空间,作用域-->全局变量与局部变量)

目录 前言&#xff1a; 1.函数对象 &#xff08;1&#xff09;函数对象的引用 &#xff08;2&#xff09;函数可以放到序列里面 &#xff08;3&#xff09;函数可以作为参数 &#xff0c; 传递给另一个函数 2.名称空间 3.作用域 &#xff08;1&#xff09;作用域的理解 …...

Java中Json字符串和Java对象的互转

JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式。诞生于 2002 年。易于人阅读和编写。同时也易于机器解析和生成。JSON 是目前主流的前后端数据传输方式。 JSON 采用完全独立于语言的文本格式&#xff0c;但是也使用了类似于 C 语言家族的…...

代码随想录NO42 | 动态规划_Leetcode70. 爬楼梯 (进阶) 322. 零钱兑换 279.完全平方数

动态规划_Leetcode70. 爬楼梯 &#xff08;进阶&#xff09; 322. 零钱兑换 279.完全平方数70. 爬楼梯 &#xff08;进阶&#xff09; 在原题基础上&#xff0c;改为&#xff1a;一步一个台阶&#xff0c;两个台阶&#xff0c;三个台阶&#xff0c;…&#xff0c;直到 m个台阶…...

【C++从入门到放弃】初识C++(基础知识入门详解)

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《C从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; C基础…...

企业工程项目管理系统源码+spring cloud 系统管理+java 系统设置+二次开发

工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#xff1a;实现对数据字典标签的增删改查操作 2、编码管理&#xff1a;实现对系统编码的增删改查操作 3、用户管理&#xff1a;管理和查看用户角色 4、菜单管理&#xff1a;实现对系统菜单的增删改查操…...

【GPLT 三阶题目集】L3-016 二叉搜索树的结构

二叉搜索树或者是一棵空树&#xff0c;或者是具有下列性质的二叉树&#xff1a; 若它的左子树不空&#xff0c;则左子树上所有结点的值均小于它的根结点的值&#xff1b;若它的右子树不空&#xff0c;则右子树上所有结点的值均大于它的根结点的值&#xff1b;它的左、右子树也分…...

核心交换机安全多业务高性能万兆交换机

RG-S5750-24SFP/12GT交换机是锐捷网络推出的融合了高性能、高安全、多业务的新一代三层交换机。RG-S5750-24SFP/12GT 交换机能够提供灵活的介质接口&#xff0c;满足网络建设中不同介质的连接需要。全千兆的端口形态&#xff0c;加上可扩展的高密度万兆端口&#xff0c;提供1&a…...

Android APK 签名打包原理分析(三)【静默安装的实现方案】

背景 小编目前从事的系统定制类工作,有客户提出了,需要后台“静默安装”他们的app,也就是悄无声息的安装,而且特别强调,不可以跳出任何安装引导页面,他们的app下载完成之后,后台调用公开的android install代码,系统就后台完成安装,安装完成之后,重新打开应用就可以。…...

网站 手机网站/惠州百度关键词优化

微信小程序接入腾讯云IM即时通讯&#xff08;会话列表之未读消息&#xff0c;显示最新一条消息开发步骤&#xff09; 1.未读消息思路 首先&#xff0c;获取未读消息第一步就是要先能接收到对方发送的信息&#xff0c;也就是要在官方api中给的监听新消息事件做处理。先看下图需…...

中国最大的库存尾货清货平台/seo关键词排名优化品牌

近来公司redmine服务器表现很糟糕&#xff0c;在16核&#xff0c;64GRAM的机器上&#xff0c;压测结果竟然只有每秒5~7个请求&#xff0c;部分页面一个都出不来。 以下是我对Redmine性能优化方案: redmine服务器性能问题排查与优化建议&#xff1a; 以下建议的方案是基于redmi…...

星月教你做网站的文档/360免费建站教程

阅读本文前&#xff0c;请您先点击上面的蓝色字体&#xff0c;再点击“关注”&#xff0c;这样您就可以继续免费收到最新文章了。每天都有分享。完全是免费订阅&#xff0c;请放心关注。总是在抖音上听到好听的日语歌曲&#xff0c;却不知道叫什么名字&#xff0c;今天就为大家…...

上海免费网站建设/宁波seo教学

作者&#xff1a;张志朋出处&#xff1a;https://blog.52itstyle.com题记工作也有几多年了&#xff0c;无论是身边遇到的还是耳间闻到的&#xff0c;多多少少也积攒了自己的一些经验和思考&#xff0c;当然&#xff0c;博主并没有太多接触高大上的分布式架构实践&#xff0c;相…...

网站建设网络公司整站源码/seo基础知识培训视频

首先&#xff0c;Spark是MapReduce-like&#xff08;架构上和多数分布式计算框架类似&#xff09;&#xff0c;Spark有分配任务的主节点&#xff08;Driver&#xff09;和执行计算的工作节点&#xff08;Worker&#xff09;。 其次&#xff0c;Low-latency基本上应该是源于Work…...

福田工作招聘/济南seo整站优化招商电话

在开始使用Ajax辅助方法前&#xff0c;必须在页面中载入jQuery以及jquery.unobtrusive-ajax.js文件才能正常执行。 为了让网站载入适当的JS函数库&#xff0c;必须先让Layout页面载入适当的JS文件&#xff0c;在MVC4模板中&#xff0c;默认已将Jquery文件加入&#xff0c;代码如…...