Java并发:互斥锁,读写锁,公平锁,Condition,StampedLock
阅读本文之前可以看一看 Java 多线程基础:
Java:多线程(进程线程,线程状态,创建线程,线程操作)
Java:多线程(同步死锁,锁&原子变量,线程通信,线程池)
1,互斥锁
1.1,可重入锁
锁的可重入性(Reentrant Locking)是指在同一个线程中,已经获取锁的线程可以再次获取该锁而不会导致死锁。这种特性允许线程在持有锁的情况下,可以递归地调用自身的同步方法或代码块,而不会因为再次尝试获取相同的锁而被阻塞。显然,通常的锁都要设计成可重入的。否则就会发生死锁。
synchronized关键字,就是可重入锁。在一个 synchronized 方法 method1() 里面调用另外一个 synchronized 方法 method2() 。如果 synchronized 关键字不可重入,那么再 method2() 处就会发生阻塞,这显然不可行。
public synchronized void method1() {method2(); // 同一个线程可以再次进入 method2()
}public synchronized void method2() {// 执行某些操作
}
Concurrent包中的与互斥锁(ReentrantLock)相关类之间的继承层次:
Lock是一个接口,其定义如下:
- lock():获取锁。如果锁已经被其他线程占用,则当前线程会被阻塞,直到锁被释放。
- lockInterruptibly():它允许线程在等待锁的过程中响应中断。如果线程在等待锁时被中断,抛出 InterruptedException 并退出等待。
- tryLock():尝试获取锁,但不会阻塞线程。如果锁定成功,返回 true;如果锁已被其他线程占用,立即返回 false。
- tryLock(long time, TimeUnit unit):尝试在指定的时间内获取锁。如果锁在指定时间内被释放,则返回 true 并成功获取锁;否则返回 false。期间如果线程被中断,会抛出 InterruptedException。
- unlock():释放锁。通常在获取锁之后的 finally 块中调用,确保锁在任务完成后被释放,避免死锁。
- newCondition():返回一个绑定到该锁的新 Condition 实例。Condition 提供了类似于 Object 的 wait、notify、notifyAll 方法的功能,但更加灵活,可以实现多条件等待/通知机制。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}
ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中:
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;public void lock() {sync.lock();}public void lock() {sync.lock();}
1.2,公平锁&非公平锁( lock() & tryAcquire())
Sync是一个抽象类,它有两个子类FairSync和NonfairSync,分别对应公平锁和非公平锁。从下面的ReentrantLock否早方法可以看出,会传入一个布尔类型的变量fair指定锁是公平锁还是非公平锁。默认设置的是非共公平锁,是为了提高效率,减少线程切换。
public ReentrantLock() {sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
公平锁: 公平锁是一种严格遵循先来先服务原则的锁机制。当多个线程争用同一个锁时,锁会按照线程请求锁的顺序来分配,即先请求的线程优先获取锁,后请求的线程则需要等待前面的线程释放锁。
Lock fairLock = new ReentrantLock(true); // true 表示使用公平锁static final class FairSync extends Sync {
//没有一上来就抢锁,在这个函数内部排队,是公平的。final void lock() {acquire(1);}
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
-
addWaiter(Node.EXCLUSIVE):该方法将当前线程添加到等待队列中,Node.EXCLUSIVE表示该线程请求的是独占锁。它返回一个Node对象,表示这个线程在队列中的位置。
-
acquireQueued(node, arg):这个方法接收先前创建的Node对象,尝试在队列中获取锁。它通常会阻塞当前线程,直到锁变为可用,或者发生中断。
-
selfInterrupt()方法用于中断当前线程。当一个线程在获取锁时未能成功且已经被加入等待队列后,如果该线程发现自己被阻塞了,调用selfInterrupt()可以通知系统该线程需要被中断,通常是为了响应外部中断请求。
//FairSync
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//只有当c===0(没有线程持有锁)if (c == 0) {//检查当前线程前面是否有其他线程排队等待获取锁if (!hasQueuedPredecessors() &&//更改锁的状态compareAndSetState(0, acquires)) {//将当前线程设置为锁的独占持有者setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
非公平锁: 非公平锁是一种不保证锁分配顺序的锁机制。线程在尝试获取锁时,可能会直接“插队”,即使有其他线程已经在等待锁。如果锁是空闲的,任何线程都可以获取它,无论它们何时请求锁。
Lock nonFairLock = new ReentrantLock(); // 默认构造方法,即非公平锁
static final class NonfairSync extends Sync {...final void lock() {//一上来就尝试修改state值,也就是抢锁。//不考虑队列中有没有其他线程在排队。//如果成功,表示当前线程获得了锁,此时调用 setExclusiveOwnerThread(Thread.currentThread()) 设置当前线程为锁的拥有者。//如果 compareAndSetState 失败,说明锁已经被其他线程占用,此时调用 acquire(1) 方法,进入获取锁的队列等待。if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
//NonfairSync
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//如果state为0,不考虑队列中没有等待的线程,直接抢锁if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果排他线程就是当前线程,才直接设置state值。else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
代码对比:
1.3,AbstractQueuedSynchronizer(AQS)
Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:
- ① 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,使用CAS保证线程安全。
- ② 需要记录当前是哪个线程持有锁。
- ③ 需要底层支持对一个线程进行阻塞或唤醒操作。
- ④ 需要有一个队列维护所有阻塞的线程,这个队列也必须是线程安全的无锁队列,也需要使用CAS。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {// ...private transient Thread exclusiveOwnerThread; // 记录持有锁的线程
}public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {private volatile int state; // 记录锁的状态,通过CAS修改state的值。 // ...
}
针对要素 ① 和 ②,在上面两个类中有对应的体现:state取值可以是0,1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。
- 当state=0,没有线程持有锁,exclusiveOwnerThread=null;
- 当state=1,有一个线程持有锁,exclusiveOwnerThread=该线程;
- 当state>0,说明该线程(exclusiveOwnerThread)重入了该锁。
对于要素 ③,Unsafe类提供了阻塞或唤醒线程的一堆操作原语,也就是park/unpark。
public final class Unsafe {public native void unpark(Object var1);public native void park(boolean var1, long var2);
}
有一个LockSupport的工具类,对这一原语做了简单封装:
public class LockSupport { // ...private static final Unsafe U = Unsafe.getUnsafe(); public static void park() {U.park(false, 0L); }public static void unpark(Thread thread) { if (thread != null)U.unpark(thread); }
}
在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread thread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。 unpark(Thread thread),它实现了一个线程对另外一个线程的"精准唤醒"。notify也只是唤醒某一个线程,但无法执行指定唤醒哪个线程。
针对要素 ④,在AQS中利用双向链表和CAS实现了一个阻塞队列。
public abstract class AbstractQueuedSynchronizer { // ...static final class Node {volatile Thread thread; // 每个Node对应一个被阻塞的线程volatile Node prev;volatile Node next; // ...}private transient volatile Node head; private transient volatile Node tail; // ...
}
阻塞队列是整个AQS核心中的核心。 head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置。
初始时,head=tail=NULL; 然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空的Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。
1.4,阻塞队列与唤醒机制(⭐)
addWaiter(…)方法将当前线程封装成一个 Node,然后添加到等待队列的尾部。该方法的目的是让线程进入同步队列,以便在适当的时机被唤醒或中断。
//AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {//创建节点,尝试将节点追加到队列尾部。Node node = new Node(Thread.currentThread(), mode);//获取tail节点,将tail节点的next设置为当前节点。Node pred = tail;//如果tail不存在,就初始化队列。if (pred != null) {node.prev = pred;//先尝试加到队列尾部,如果不成功,则执行enq(node);if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//enq内部会进行队列的初始化,新建一个空的Node。然后不断尝试自旋,直至成功把该Node加入队列尾部为止。enq(node);return node;
}
在addWaiter(…)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(…)方法完成。 线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从acquireQueued(…)返回。
进入acquireQueued(…),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移一个节点)。
//AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();//被唤醒,如果自己在队列头部,则尝试拿锁。if (p == head && tryAcquire(arg)) {//拿锁成功,出队列;setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
首先,acquireQueued(…)方法有一个返回值,表示什么意思?虽然该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法返回true;否则,返回false。
//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
static void selfInterrupt() {Thread.currentThread().interrupt();
}
当acquireQueued(…)返回true时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。
阻塞就发生在下面这个方法中:
//AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
线程调用park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。
park()返回有两种情况:
- 其他线程调用了unpark(Thread t)
- 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次阻塞自己。不断循环重复此过程,直到拿到锁。
被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,返回fasle;如果是情况2,返回true。
1.5,unlock()
unlock不区分公平还是非公平:当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。
//ReentrantLock
public void unlock() {sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {//tryRelease(...)方法释放锁if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//unparkSuccessor(...)方法唤醒队列中的后继者unparkSuccessor(h);return true;}return false;
}
tryRelease方法:
//ReentrantLock
protected final boolean tryRelease(int releases) {int c = getState() - releases;//只有锁的拥有者才有资格调用unlock()函数,否则抛出异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;//每次调用tryRelease,state值减1,直到0,才代表锁可以被成功释放if (c == 0) {free = true;setExclusiveOwnerThread(null);}//没有使用CAS,而直接用set。因为是排他锁,只有一个线程能调减state值。setState(c);return free;
}
unparkSuccessor方法:
//ReentrantLock
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
1.6,trylock()
tryLock()实现基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。
//ReentrantLock
public boolean tryLock() {return sync.nonfairTryAcquire(1);
}
2,读写锁
和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间不互斥。
2.1,类继承层次
ReadWriteLock是一个接口,内部由两个Lock接口组成。
public interface ReadWriteLock {Lock readLock();Lock writeLock();
}
ReentrantReadWriteLock实现了该接口,使用方式如下:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
readLock.lock();
// 进行读取操作
readLock.unlock();Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
// 进行写操作
writeLock.unlock();
也就是说,当使用ReadWriteLock的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。
2.2,读写锁基本原理
从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为一把锁,线程分为两类:读线程和写线程。读线程和写线程之间不互斥(可以同时拿到这把锁),读线程之间不互斥、写线程之间互斥。
从下面的构造方法也可以看出,readerLock和writerLock实际公用同一个sync对象。sync对象同互斥锁一样,分为非公平锁和公平两种策略,并继承自AQS。
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}
同互斥锁一样,读写锁也是用state变量表示锁状态的。只是state变量在这里的含义和互斥锁完全不同。在内部类Sync中,对state变量进行重新定义,如下所示:
abstract static class Sync extends AbstractQueuedSynchronizer { // ...static final int SHARED_SHIFT = 16;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 持有读锁的线程的重入次数static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 持有写锁的线程的重入次数static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // ...private volatile int state;
}
也就是把state变量拆成两半,低16位,用来记录写锁;高16位,用来记录"读"锁。但同一时间既然只能有一个线程写,为什么还需要16位呢?因为一个写线程可能多次重入。16位的数值范围是0到65535,这意味着一个线程最多可以重入写锁65535次。这个范围通常已经足够大,能够满足绝大多数场景中的需求;高16位的值等于5,既可以表示5个线程都拿到了该锁;也可以表示一个读线程重入了5次。
为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?
CAS操作只能在一次操作中对一个内存地址的值进行比较和交换。无法用一次CAS同时操作两个int变量,所以用来一个int型的高16位和低16位分别表示读锁和写锁的状态。
- 当state = 0时,说明既没有线程持有读锁,也没有线程持有写锁;
- 当state !=0时,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底时读线程还是写线程持有了该锁。
2.3,lock() & unlock()
public static class ReadLock implements Lock, java.io.Serializable { // ...public void lock() {sync.acquireShared(1); }public void unlock() {sync.releaseShared(1); }// ...
}
public static class WriteLock implements Lock, java.io.Serializable { // ...public void lock() { sync.acquire(1); }public void unlock() { sync.release(1); }// ...
}
acquire/release、acquireShared/releaseShared是AQS里面的两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的。这两对模板方法的代码如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ...public final void acquire(int arg) {if (!tryAcquire(arg) && // tryAcquire方法由多个Sync子类实现acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) // tryAcquireShared方法由多个Sync子类实现doAcquireShared(arg);}public final boolean release(int arg) {if (tryRelease(arg)) { // tryRelease方法由多个Sync子类实现 Node h = head;if (h != null && h.waitStatus != 0) unparkSuccessor(h);return true; }return false; }public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // tryReleaseShared方法由多个Sync子类实现doReleaseShared();return true; }return false; }// ...
}
将读/写、公平/非公平进行排列组合,就有4种组合。如下,上面的两个方法都是在Sync种实现的。Sync种的两个方法又是模板方法,在NonfairSync和FairSync种分别有实现。最终的对应关系如下:
- 读锁的公平实现:Sync.tryAcquireShared() + FairSync 中的 lock() & tryAcquire()。tryAcquire())。
- 读锁的非公平实现:Sync.tryAcquireShared() + NonfairSync中的 lock() & tryAcquire()。
- 写锁的公平实现:Sync.tryAcquire() + FairSync中的 lock() & tryAcquire()。
- 写锁的非公平实现:Sync.tryAcquire() + NonfairSync中的 lock() & tryAcquire()。
/*** Nonfair version of Sync*/
static final class NonfairSync extends Sync {private static final long serialVersionUID = -8159625535654395037L;// 写线程枪锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁之前永远不被阻塞,非公平锁return false; // writers can always barge}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程抢锁的时候,当队列中第一个元素是写线程的时候要阻塞return apparentlyFirstQueuedIsExclusive();}
}/*** Fair version of Sync*/
static final class FairSync extends Sync {private static final long serialVersionUID = -2274990926593161451L;// 写线程抢锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁之前,如果队列中有其他线程在排队,则阻塞。公平锁return hasQueuedPredecessors();}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程在抢锁之前,如果队列中有其他线程在排队,阻塞。公平锁return hasQueuedPredecessors();}
}
对于非公平,读锁和写锁的实现策略稍有差异:
- 写锁能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢锁。或者state !=0,但哪个持有写锁的线程是自己,再次重入。写线程是非公平的,即writerShouldBlock()方法一直返回false。
- 对于读线程,假设当前线程被读线程持有,然后其他读线程还非公平地一直去抢,可能导致写线程永远拿不到锁,所以对于读线程的非公平,要做一些"约束"。当发现队列的第一个元素是写线程的时候,读线程也要阻塞,不能直接去抢。即偏向写线程。
【问题】读锁是共享的,为啥还要区分公平和非公平?
【答案】假设场景是图书馆,有足够多的Java从入门到精通,满足全球的人都可以同时借阅(读锁)。尽管有足够的书(共享资源),但是读请求还是需要排队,因为系统的性能是有限的,比如CPU和内存的处理能力。如果所有请求同时占用读锁,可能会导致资源竞争和性能下降。在公平的情况下,排队可以确保每个人都有机会顺利借书,而非公平可能导致一些请求长时间得不到响应。若每个线程都获得读锁,可能会导致过多的竞争和性能下降,尤其在高并发情况下。
2.4,WriteLock公平锁&非公平锁
写锁是排他锁,实现策略类似于互斥锁,重写了tryAcquire/tryRelease方法。
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();//写线程只能有一个,但写线程可以多次重入int w = exclusiveCount(c);//当c!=0 说明有读线程或者写线程持有锁if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)// w == 0, 说明锁被读线程持有,只能返回:w!=0,持有写锁的线程不是自己,也只能返回。if (w == 0 || current != getExclusiveOwnerThread())return false;//16位用满了,超过了最大重入次数。if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquiresetState(c + acquires);return true;}//公平锁实现和非公平锁实现只是writerShouldBlock()分别被FairSync和NonfairSync实现。if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;//抢锁成功后,将ownerThread设成自己。setExclusiveOwnerThread(current);return true;
}
- c!=0 and w==0,说明当前一定是读线程拿着锁,写锁一定拿不到,返回false。
- c!=0 and w!=0,说明当前一定是写线程拿着锁,执行current!=getExclusive-OwnerThread()的判断,发现ownerThread不是自己,返回false。
- c!=0 and w!=0 and current = getExclusiveOwnerThread(),才会走到 if (w + exclusiveCount(acquires) > MAX_COUNT)。判断重入次数,重入次数超过最大值,抛出异常。
- if (c=0),说明当前既没有读线程,也没有写线程持有该锁。可以通过CAS操作开抢。
tryRelease()分析
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);//因为写锁是排他的,在当前线程持有写锁的时候,其他线程不会持有写锁也不会持有读锁。所以,这里对state值的调减不需要CAS操作,直接减1即可。setState(nextc);return free;
}
tryLock和lock方法不区分公平/非公平。
//ReentrantReadWriteLock
public boolean tryLock( ) {return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();int c = getState();//当state不是0的时候,如果写线程获取锁的个数是0,或者写线程不是当前线程,则返回枪锁失败。if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}//只要不是上面的情况,则通过CAS设置state的值。//如果设置成功,就将排他线程设置为当前线程并返回true。if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;
}
2.5,ReadLock公平锁&非公平锁
读锁是共享锁,重写了tryAcquireShared/tryReleaseShared方法其实现策略和排他锁有很大差异。
//ReentrantReadWriteLock
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();//写锁被某线程持有,并且这个线程还不是自己,读锁肯定拿不到,直接返回。if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);//公平和非公平的差异就在于这个函数if (!readerShouldBlock() &&r < MAX_COUNT &&//CAS拿读锁,高16位加1compareAndSetState(c, c + SHARED_UNIT)) {//r之前等于0,说明这是第一个拿到读锁的线程if (r == 0) {firstReader = current;firstReaderHoldCount = 1;//不是第一个} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//上面拿读锁失败,进入这个函数不断自旋拿读锁return fullTryAcquireShared(current);
}
if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;
低16位不等于0,说明有写线程持有锁,并且只有当ownerThread !=自己时,才返回-1。如果current=ownerThread,则这段代码不会返回。这是因为一个写线程可以再次去拿读锁!也就是说,一个线程在持有WriteLock后,再去调用ReadLock.lock也是可以的。
compareAndSetState(c, c + SHARED_UNIT))
把state的高16位加1(读锁的状态),但因为是在高16位,必须把1左移16位再加1。
firstReader,cacheHoldConunter之类的变量,只是一些统计变量,在ReentrantRead-WriteLock对外的一些查询函数中会用到,例如,查询持有读锁的线程列表,但对整个读写互斥机制没有影响。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread();// ...for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free.return nextc == 0; }
}
因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环 + CAS操作不断重试。这是tryReleaseShared和tryReleased的根本差异所在。
3,Condition
3.1,Condition与Lock的关系
Condition与Lock配合使用,提供了比传统的Object.wait()和Object.notify()更灵活的线程协调机制。Condition的主要作用是允许一个或多个线程在特定条件下等待,直到被其他线程通知。与Object的等待/通知机制相比,Condition提供了以下优势:
- 多条件支持:一个锁可以有多个Condition对象,每个对象可以代表一个不同的等待条件。
- 更细粒度的控制:使用Condition,线程可以在等待时释放锁,并在被唤醒后重新获得锁,避免了锁的持有时间过长的问题。
- 更好的可读性和维护性:代码逻辑更加清晰,易于理解。
【案例】Condition实现生产者和消费者对共享缓冲区的访问
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
List<Object> buffer = new ArrayList<>();
int capacity = 10;public void produce() {lock.lock();try {while (buffer.size() == capacity) {notFull.await(); // 等待缓冲区不满}buffer.add(new Object());notEmpty.signal(); // 通知消费者} finally {lock.unlock();}
}public void consume() {lock.lock();try {while (buffer.isEmpty()) {notEmpty.await(); // 等待缓冲区不空}buffer.remove(0);notFull.signal(); // 通知生产者} finally {lock.unlock();}
}
public interface Condition {void await() throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException;void awaitUninterruptibly();boolean awaitUntil(Date deadline) throws InterruptedException; void signal();void signalAll();
}
wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:
public interface Lock { void lock();void lockInterruptibly() throws InterruptedException; // 所有的Condition都是从Lock中构造出来的Condition newCondition();boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock();
}
3.2,Condition+数组实现阻塞队列
以ArrayBlockingQueue为例。如下面所示为一个用数组实现的阻塞队列,执行put(…)操作的时候,队列满了,生产者线程被阻塞;执行take()的时候,队列为空,消费者线程被阻塞。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //...final Object[] items;int takeIndex;int putIndex;int count;// 一把锁+两个条件final ReentrantLock lock;private final Condition notEmpty; private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0)throw new IllegalArgumentException(); this.items = new Object[capacity];// 构造器中创建一把锁加两个条件lock = new ReentrantLock(fair); // 构造器中创建一把锁加两个条件notEmpty = lock.newCondition(); // 构造器中创建一把锁加两个条件notFull = lock.newCondition(); }public void put(E e) throws InterruptedException { Objects.requireNonNull(e);final ReentrantLock lock = this.lock; lock.lockInterruptibly();try {while (count == items.length) // 非满条件阻塞,队列容量已满 notFull.await();enqueue(e); } finally {lock.unlock(); }}private void enqueue(E e) { final Object[] items = this.items; items[putIndex] = e;if (++putIndex == items.length) putIndex = 0; count++;// put数据结束,通知消费者非空条件 notEmpty.signal();}public E take() throws InterruptedException { final ReentrantLock lock = this.lock;lock.lockInterruptibly(); try {while (count == 0)// 阻塞于非空条件,队列元素个数为0,无法消费 notEmpty.await();return dequeue(); } finally {lock.unlock(); }}private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked")E e = (E) items[takeIndex]; items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0; count--;if (itrs != null)itrs.elementDequeued();// 消费成功,通知非满条件,队列中有空间,可以生产元素了。 notFull.signal();return e; }// ...
}
3.3,Condition实现原理
可以发现,Condition的使用很方便,避免了wait/notify的生产者通知生产者,消费者通知消费者的问题。具体实现如下:
由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。首先查看互斥锁和读写锁中Condition的构造方法
public class ReentrantLock implements Lock, java.io.Serializable { // ...public Condition newCondition() {return sync.newCondition(); }
}public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // ...private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; // ...public static class ReadLock implements Lock, java.io.Serializable { // 读锁不支持Conditionpublic Condition newCondition() {// 抛异常throw new UnsupportedOperationException(); }}public static class WriteLock implements Lock, java.io.Serializable { // ...public Condition newCondition() {return sync.newCondition(); }// ... }// ...
}
首先,读写锁中的ReadLock是不支持Condition的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {// Condition的所有实现,都在ConditionObject类中 }
}public class ReentrantLock implements Lock, java.io.Serializable {abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() {return new ConditionObject(); }}
}
每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列,如下:
public class ConditionObject implements Condition, java.io.Serializable { //transient 指示某个字段在序列化对象时不应被序列化。private transient Node firstWaiter;private transient Node lastWaiter;
}
static final class Node {volatile Node prev;volatile Node next;volatile Thread thread; Node nextWaiter;
}
3.4,await()实现分析
public final void await() throws InterruptedException {// 刚要执行await()操作,收到中断信号,抛异常if (Thread.interrupted())throw new InterruptedException();// 加入Condition的等待队列Node node = addConditionWaiter();// 阻塞在Condition之前必须先释放锁,否则会死锁int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 阻塞当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 重新获取锁if (acquiraeQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)// 被中断唤醒,抛中断异常reportInterruptAfterWait(interruptMode);
}
- 线程调用await()的时候,肯定已经先拿到了锁。所以,在addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程是安全的,代码如下:
private Node addConditionWaiter() {// ...Node t = lastWaiter;// ...Node node = new Node(Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
-
在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
-
线程从wait中被唤醒后,必须用acquiraeQueued(node, savedState)方法重新拿锁。
-
checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:
- 一种是其他线程调用了unpark;
- 另一种是收到中断信号。
-
这里的await()方法是可以响应中断,所以当发现自己被中断唤醒的,而不是被unpark唤醒时,会直接退出while循环,await()方法也会返回。
-
isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node值在Condition的队列里,而不在AQS的队列里。但执行notify操作的时候,会放进AQS的同步队列。
3.5,awaitUniterruptibly()实现分析
与await()不同,awaitUninterruptibly()不会响应中断,其方法的定义中不会有中断异常抛出,下面分析其实现和await()的区别。
public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);//当线程唤醒后,如果被中断过,仅记录,不处理,继续进行while循环if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();
}
可以看出,整体代码和await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。
3.6,notify()实现分析
public final void signal() {// 只有持有锁的线程,才有资格调用signal()方法if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)// 发起通知doSignal(first);
}// 唤醒队列中的第1个线程
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) && (first = firstWaiter) != null);
}final boolean transferForSignal(Node node) {if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))return false;// 先把Node放入互斥锁的同步队列中,再调用unpark方法Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
同await()一样,在调用notify()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。然后从队列中取出firstWaiter,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()方法里面的判断条件:while (!isOnSyncQueue(node)) ,这个判断条件满足,说明await线程不是被中断的,而是被unpark唤醒的。
4,StampedLock
4.1,StampedLock使用场景
锁 | 并发度 |
---|---|
ReentrantLock | 读与读互斥,读与写互斥,写与写互斥 |
ReentrantReadWriteLock | 读与读不互斥,读与写互斥,写与写互斥 |
StampedLock | 读与读不互斥,读与写不互斥,写与写互斥 |
可以看到,从ReentrantLock到StampedLock,并发度依次提高。StampedLock是如何做到“读”与“写”也不互斥、并发地访问的呢?MySQL 高并发的核心机制 MVCC,也就是一份数据多个版本,此处的StampedLock有异曲同工之妙。
另一方面,因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。 虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。
class Point {private double x,y;private final StampedLock s1 = new StampedLock();//多个线程调用该函数,修改x,y的值void move(double deltaX, double deltaY) {long stamp = s1.writeLock();try {x += deltaX;y += deltaY;} finally {s1.unlockWrite(stamp);}}//多个线程调用该函数,求距离。使用“乐观读”将共享变量拷贝到线程栈中。//读的期间,其他线程修改了共享变量(读到脏数据),放弃。升级为悲观锁。double distanceFromOrigin() {long stamp = s1.tryOptimisticRead();double currentX = x, currentY = y;if (!s1.validate(stamp)) {stamp = s1.readLock();try {currentX = x;currentY = y;} finally {s1.unlockRead(stamp);}}return Math.sqrt(currentX*currentX + currentY*currentY);}
}
首先,执行move操作的时候,要加锁。这个用法和ReadWriteLock的用法没有区别,写操作和写操作也是互斥的。关键在于读的时候,用了一个“乐观读”sl.tryOptimisticRead(),相当于在读之前给数据的状态做了一个“快照”。然后,把数据拷贝到内存里面,在用之前,再比对一次版本号。如果版本号变了,则说明在读的期间有其他线程修改了数据。读出来的数据废弃,重新获取读锁。 关键代码就是下面这三行:
//在读之前,获取数据的版本号。
//读:将一份数据拷贝到线程的栈内存中
//读之后:将读之前的版本号和当前版本号比对,判断读出来数据是否可以使用(期间没有被其他线程修改)
long stamp = s1.tryOptimisticRead();
double currentX = x, currentY = y;
if (!s1.validate(stamp)) {
要说明的是,这三行关键代码对顺序非常敏感,不能有重排序。因为 state 变量已经是volatile,所以可以禁止重排序,但stamp并不是volatile 的。为此,在 validate(stamp) 函数里面插入内存屏障。
4.2,“乐观读”的实现原理
首先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示读锁和写锁的状态。同时,它还需要一个数据的version。但正如前面所说,一次CAS没有办法操作两个变量,所以这个state变量本身同时也表示了数据的version。下面先分析state变量。
public class StampedLock implements java.io.Serializable {private static final long RUNIT = 1L;//第8位表示写锁private static final long WBIT = 1L << LG_READERS;//最低的7位表示读锁private static final long RBITS = WBIT - 1L;//读锁的数目private static final long RFULL = RBITS - 1L;//读锁和写锁的状态整合到一起private static final long ABITS = RBITS | WBIT;private static final long SBITS = ~RBITS; // note overlap with ABITS// Initial value for lock state; avoid failure value zeroprivate static final long ORIGIN = WBIT << 1;//state的初始值private transient volatile long state;
用最低的8位表示读和写的状态,其中第8位表示写锁的状态,最低的7位表示读锁的状态。因为写锁只有一个bit位,所以写锁是不可重入的。
初始值不为0,而是把WBIT 向左移动了一位,也就是上面的ORIGIN 常量,构造函数如下所示。
// Initial value for lock state; avoid failure value zero
private static final long ORIGIN = WBIT << 1;public StampedLock() {state = ORIGIN;
}
为什么state的初始值不设为0呢?初始值不是0的原因主要是为了确保在未加锁时,写锁的版本号和乐观读锁的戳能明确表示出状态。
//在读之前,获取数据的版本号。
//读:将一份数据拷贝到线程的栈内存中
//读之后:将读之前的版本号和当前版本号比对,判断读出来数据是否可以使用(期间没有被其他线程修改)
long stamp = s1.tryOptimisticRead();
double currentX = x, currentY = y;
if (!s1.validate(stamp)) {
public long tryOptimisticRead() {long s;return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {U.loadFence();return (stamp & SBITS) == (state & SBITS);
}
- 当state&WBIT!=0的时候,说明有线程持有写锁,上面的tryOptimisticRead会永远返回0。这样,再调用validate(stamp),也就是validate(0)也会永远返回false。
- 当有线程持有写锁的时候,validate永远返回false,无论写线程是否释放了写锁。因为无论是否释放了(state回到初始值)写锁,state值都不为0,所以validate(0)永远为false。
【问题】为什么上面的validate()函数不直接比较stamp=state,而要比较state&SBITS=state&SBITS 呢?
【答案】因为读锁和读锁是不互斥的!所以,即使在“乐观读”的时候,state 值被修改了,但如果它改的是第7位,validate()还是会返回true。
另外要说明的一点是,上面使用了内存屏障 U.loadFence(),是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile
的,由此可以禁止其和前面的currentX=X,currentY=Y进行重排序。通过上面的分析,可以发现state的设计非常巧妙。只通过一个变量,既实现了读锁、写锁的状态记录,还实现了数据的版本号的记录。
4.3,悲观读/写:“阻塞”和“自旋”策略实现差异
和ReadWriteLock一样,StampedLock也要进行悲观的读锁和写锁操作。不过它不是基于AQS实现的,而是内部重新实现了一个阻塞队列。
static final class WNode {volatile WNode prev;volatile WNode next;volatile WNode cowait; // list of linked readersvolatile Thread thread; // non-null while possibly parkedvolatile int status; // 0, WAITING, or CANCELLEDfinal int mode; // RMODE or WMODEWNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
这个阻塞队列和 AQS 里面的很像。刚开始的时候,whead=wtail=NULL,然后初始化,建一个空节点,whead和wtail都指向这个空节点,之后往里面加入一个个读线程或写线程节点。但基于这个阻塞队列实现的锁的调度策略和AQS很不一样,也就是“自旋”。
- 在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。
- 但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。为此,根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,线程被调度的上下文切换可能会使自旋锁不如传统的阻塞锁有效。在多核情况下,自旋策略可以更有效地利用 CPU 资源,因为多个线程可以同时运行。
private static final int NCPU = Runtime.getRuntime().availableProcessors();private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
下面以写锁的加锁,也就是StampedLock的writeLock()函数为例,来看一下自旋的实现。
public long writeLock() {long s, next; return ((((s = state) & ABITS) == 0L &&U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?next : acquireWrite(false, 0L));
}
如上面代码所示,当state&ABITS==0的时候,说明既没有线程持有读锁,也没有线程持有写锁,此时当前线程才有资格通过CAS操作state。若操作不成功,则调用acquireWrite()函数进入阻塞队列,并进行自旋,这个函数是整个加锁操作的核心,代码如下。
private long acquireWrite(boolean interruptible, long deadline) {WNode node = null, p;//入队列时自选for (int spins = -1;;) { // spin while enqueuinglong m, s, ns;if ((m = (s = state) & ABITS) == 0L) {if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//自旋的时候拿到了锁,函数返回return ns;}else if (spins < 0)spins = (m == WBIT && wtail == whead) ? SPINS : 0;else if (spins > 0) {if (LockSupport.nextSecondarySeed() >= 0)//不断自旋,以一定的概率把spins值往下累减--spins;}else if ((p = wtail) == null) { //初始化队列WNode hd = new WNode(WMODE, null);if (U.compareAndSwapObject(this, WHEAD, null, hd))wtail = hd;}else if (node == null)node = new WNode(WMODE, p);else if (node.prev != p)node.prev = p;else if (U.compareAndSwapObject(this, WTAIL, p, node)) {p.next = node;//for循环唯一的break,CAS tail成功(成功加入队列尾部),才会退出for循环break;}}for (int spins = -1;;) {WNode h, np, pp; int ps;if ((h = whead) == p) {if (spins < 0)spins = HEAD_SPINS;else if (spins < MAX_HEAD_SPINS)spins <<= 1;for (int k = spins;;) { // spin at headlong s, ns;//再次尝试拿锁if (((s = state) & ABITS) == 0L) {if (U.compareAndSwapLong(this, STATE, s,ns = s + WBIT)) {whead = node;node.prev = null;return ns;}}else if (LockSupport.nextSecondarySeed() >= 0 &&--k <= 0) //不断自旋break;}}else if (h != null) { // help release stale waitersWNode c; Thread w;//自己从阻塞中唤醒,然后唤醒cowait中所有reader线程while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&(w = c.thread) != null)U.unpark(w);}}if (whead == h) {if ((np = node.prev) != p) {if (np != null)(p = np).next = node; // stale}else if ((ps = p.status) == 0)U.compareAndSwapInt(p, WSTATUS, 0, WAITING);else if (ps == CANCELLED) {if ((pp = p.prev) != null) {node.prev = pp;pp.next = node;}}else {long time; // 0 argument to park means no timeoutif (deadline == 0L)time = 0L;else if ((time = deadline - System.nanoTime()) <= 0L)return cancelWaiter(node, node, false);Thread wt = Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);node.thread = wt;if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&whead == h && node.prev == p)//进入阻塞状态,之后被另外一个线程release唤醒,接着往下执行这个for循环U.park(false, time);node.thread = null;U.putObject(wt, PARKBLOCKER, null);if (interruptible && Thread.interrupted())return cancelWaiter(node, node, true);}}}}
整个acquireWrite()函数是两个大的for循环,内部实现了非常复杂的自旋策略。在第一个大的for循环里面,目的就是把该Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁。如果获得了,整个函数就会返回;如果不能获得锁,会一直自旋,直到加入队列尾部。
在第二个大的for循环里,也就是该Node已经在队列尾部了。这个时候,如果发现自己刚好也在队列头部,说明队列中除了空的Head节点,就是当前线程了。此时,再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数,然后进入阻塞。这里有一个关键点要说明:当release()函数被调用之后,会唤醒队列头部的第1个元素,此时会执行第二个大的for循环里面的逻辑,也就是接着for循环里面park()函数后面的代码往下执行。
另外一个不同于AQS的阻塞队列的地方是,在每个WNode里面有一个cowait指针,用于串联起所有的读线程。例如,队列尾部阻塞的是一个读线程 1,现在又来了读线程 2、3,那么会通过cowait指针,把1、2、3串联起来。1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥。
释放锁: 和读写锁的实现类似,也是做了两件事情:一是把state变量置回原位,二是唤醒阻塞队列中的第一个节点。节点被唤醒之后,会继续执行上面的第二个大的for循环,自旋拿锁。如果成功拿到,则出队列;如果拿不到,则再次进入阻塞,等待下一次被唤醒。
// java.util.concurrent.locks.StampedLock#unlockWrite
public void unlockWrite(long stamp) {WNode h;if (state != stamp || (stamp & WBIT) == 0L)throw new IllegalMonitorStateException();state = (stamp += WBIT) == 0L ? ORIGIN : stamp;if ((h = whead) != null && h.status != 0)release(h);
}
// 唤醒队列的队首节点【头结点whead的后继节点】
private void release(WNode h) {if (h != null) {WNode q; Thread w;U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 将头结点状态从-1变为0,标识要唤醒其后继节点if ((q = h.next) == null || q.status == CANCELLED) { // 判断头结点的后继节点是否为null或状态为取消for (WNode t = wtail; t != null && t != h; t = t.prev) // 从队尾查找距头结点最近的状态为等待的节点if (t.status <= 0)q = t; // 赋值}if (q != null && (w = q.thread) != null)U.unpark(w); // 唤醒队首节点}
}
相关文章:
Java并发:互斥锁,读写锁,公平锁,Condition,StampedLock
阅读本文之前可以看一看 Java 多线程基础: Java:多线程(进程线程,线程状态,创建线程,线程操作) Java:多线程(同步死锁,锁&原子变量,线程通信&…...
在 Linux 中,要让某一个线程或进程排他性地独占一个 CPU
文章目录 1. CPU 亲和性(CPU Affinity)2. 中断隔离(IRQ Isolation)3. 系统 tickless 模式(NoHZ Mode)4. 实时调度策略5. CPU 隔离(CPU Isolation)和 Full CPU Isolation实现最低的延迟抖动在 Linux 中,要让某一个线程 排他性地独占一个 CPU,并且进一步隔离中断(包括…...
滚雪球学MySQL[7.3讲]:数据库日志与审计详解:从错误日志到审计日志的配置与使用
全文目录: 前言7.3 日志与审计1. 日志类型与配置1.1 错误日志(Error Log)配置错误日志使用场景案例演示 1.2 慢查询日志(Slow Query Log)配置慢查询日志使用场景案例演示 1.3 查询日志(General Query Log&a…...
网关的作用及其高可用性设计详解
引言 在现代分布式系统架构中,网关(Gateway)是一个关键组件。它作为客户端与后端服务之间的桥梁,不仅提供了请求路由、负载均衡、安全认证、流量控制等功能,还能够保护后端服务的安全和稳定性。网关的设计和高可用性对…...
Vortex GPGPU的github流程跑通与功能模块波形探索
文章目录 前言一、跟着官方文档走一遍二、cache子模块的波形仿真2.1 必要的文件内容解释2.2 cache子模块波形仿真——目前环境没啥问题了,就vcd因为配置问题出不来 总结 前言 看了那么久的verilog代码和文档,但还是没怎么接触过Vortex GPGPU全流程跑通与…...
10.2 Linux_并发_进程相关函数
创建子进程 函数声明如下: pid_t fork(void); 返回值:失败返回-1,成功返回两次,子进程获得0(系统分配),父进程获得子进程的pid 注意:fork创建子进程,实际上就是将父进程复制一遍作为子进程&…...
【深度学习基础模型】玻尔兹曼机BM|受限玻尔兹曼机RBM|深度置信网络DBN详细理解并附实现代码。
【深度学习基础模型】玻尔兹曼机Boltzmann machines (BM)|受限玻尔兹曼机Restricted Boltzmann machines (RBM)|深度置信网络Deep belief networks (DBN)详细理解并附实现代码。 【深度学习基础模型】玻尔兹曼机Boltzmann machines (BM)|受限玻尔兹曼机Restricted Boltzmann m…...
滑动窗口->dd爱框框
1.题目: 2.题解: 2.1为什么用滑动窗口优化: 因为元素都是大于0的 所以:当找到大于等于x的值时,right可以不用返回 两个指针都往后走;因此可以使用滑动窗口优化暴力解法 2.2:滑动窗口具体使用步…...
Python从入门到高手4.1节-掌握条件控制语句
目录 4.1.1 理解条件控制 4.1.2 if, elif, else 4.1.3 条件表达式 4.1.4 条件控制可以嵌套 4.1.5 if语句的三元运算 4.1.6 国庆节快乐 4.1.1 理解条件控制 在日常生活中,我们常喜欢说如果, "如果怎么样,那么就会怎么样"。"如果&qu…...
使用Qt实现实时数据动态绘制的折线图示例
基于Qt的 QChartView 和定时器来动态绘制折线图。它通过动画的方式逐步将数据点添加到图表上,并动态更新坐标轴的范围,提供了一个可以实时更新数据的折线图应用。以下是对代码的详细介绍及其功能解析: 代码概述 该程序使用Qt的 QChartView…...
【人人保-注册安全分析报告-无验证方式导致安全隐患】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 1. 暴力破解密码,造成用户信息泄露 2. 短信盗刷的安全问题,影响业务及导致用户投诉 3. 带来经济损失,尤其是后付费客户,风险巨大,造…...
Redis6 多线程模型
优质博文:IT-BLOG-CN 一、单线程的优缺点 对于一个请求操作Redis主要做3件事情:从客户端读取数据/解析、执行Redis命令、回写数据给客户端。所以主线程其实就是把所有操作的这3件事情串行一起执行,因为是基于内存,所以执行速度非…...
Python的异步编程
什么是协程? 协程不是计算机系统提供,程序员人为创造。 协程也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。 实现协程有那么几种方法: greenlet&…...
初识Linux · 进程等待
目录 前言: 进程等待是什么 为什么需要进程等待 进程等待都在做什么 前言: 通过上文的学习,我们了解了进程终止,知道终止是在干什么,终止的三种情况,以及有了退出码,错误码的概念ÿ…...
面向对象建模
UML 关系 UML 关系主要有:依赖、关联、聚合、组合、实现、继承。 类图 #mermaid-svg-jcAjcVcPmgmWDpcI {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-jcAjcVcPmgmWDpcI .error-icon{fill:#552222;}#m…...
MetaJUI v0.4 遇到的一些问题及解决办法记录
1、Unity3d 版本 2022.3.29f1。 2、MetaJUI v0.4 的下载,https://download.csdn.net/download/xingchengaiwei/89334848 3、将MetaJUI v0.4解压,用Unity3d 打开项目,会出现如下问题,按照图中提示操作即可。 4、打开工程后会出现…...
从零开始学习OMNeT++系列第二弹——新建一个OMNeT++的工程
上一篇第一弹介绍了OMNeT是什么以及如何安装OMNeT,现在来说一下如何新建一个自己的OMNeT的工程。 在 Omnet安装完成后,samples/tictoc 中有该例子的完整文件,你可以立刻运行该文件看他是怎么工作的,不过更推荐按接下来的步骤一步…...
【Android】布局优化—include,merge,ViewStub的使用方法
引言 1.重要性 在Android应用开发中,布局是用户界面的基础。一个高效的布局不仅能提升用户体验,还能显著改善应用的性能。随着应用功能的复杂性增加,布局的优化变得尤为重要。优化布局能够减少渲染时间,提高响应速度,…...
传奇外网架设教程带图文解说—Gee引擎
架设前准备工作: ①通过百度网盘下载版本、补丁、客户端和DBC2000。版本解压到D盘,客户端解压到D盘或是E盘,补丁先不解压 ②安装和配置DBC2000,有些版本不一定用的是DBC2000数据库,看引擎默认的数据库是哪个 DBC数据…...
MySQL | excel数据输出insert语句
需求 在日常生产运维过程中,有很多需要进行人工梳理的excel数据,到了研发这一侧需要转为sql语句进行数据修正,如何输出insert插入语句? 方案 在空白列插入,选择需要的列 "INSERT INTO tab_name1 (name, desc) …...
足球青训俱乐部管理:Spring Boot技术驱动
摘 要 随着社会经济的快速发展,人们对足球俱乐部的需求日益增加,加快了足球健身俱乐部的发展,足球俱乐部管理工作日益繁忙,传统的管理方式已经无法满足足球俱乐部管理需求,因此,为了提高足球俱乐部管理效率…...
一次实践:给自己的手机摄像头进行相机标定
文章目录 1. 问题引入2. 准备工作2.1 标定场2.2 相机拍摄 3. 基本原理3.1 成像原理3.2 畸变校正 4. 标定解算4.1 代码实现4.2 详细解析4.2.1 解算实现4.2.2 提取点位 4.3 解算结果 5. 问题补充 1. 问题引入 不得不说,现在的计算机视觉技术已经发展到足够成熟的阶段…...
【docker学习】Linux系统离线方式安装docker环境方法
centos7-linux安装docker(离线方式) 下载docker的安装文件 https://download.docker.com/linux/static/stable/x86_64/ 下载的是:docker-18.06.3-ce.tgz 这个压缩文件 将docker-18.06.3-ce.tgz文件上传到centos7-linux系统上,用ftp工具上传即可 解压…...
vscode开发uniapp安装插件指南
安装vuets的相关插件 首先是vue的相关插件,目前2024年9月应该是vue-offical 安装uniapp开发插件 uni-create-view :快速创建 uni-app 页面 安装uni-create-view之后修改插件拓展设置 勾选第一个选择创建视图时创建同名文件夹 选择第二个创建文件夹中生…...
Elasticsearch7.7.1集群不能相互发现的问题解决以及Elasticsearch7.7.1安装analysis-ik中文分词插件的应用
一、Elasticsearch7.7.1集群不能相互发现的问题解决 在使用elasticsearch7.7.1搭建集群,使用了3台服务器作为节点,但在搭建的过程中发现每台服务器的elasticsearch服务都正常,但是不能相互发现,期间进行了一些配置的修改偶尔出现了…...
蓝牙Mesh介绍
蓝牙Mesh(Bluetooth Mesh)是一种基于蓝牙技术的无线通信网络拓扑,用于在设备之间创建大规模的多点到多点网络。蓝牙Mesh网络可以让多个蓝牙设备相互通信和协作,适合需要高覆盖范围和高可靠性的场景,例如智能家居、工业…...
Qt 窗口中鼠标点击事件的坐标探讨
// 鼠标点击事件 void Widget::mousePressEvent(QMouseEvent *event) {/*event->pos()、event->windowPos()和event->localPos()都表示鼠标点击位置在窗口中的位置,它们的值都是一样的,区别在于event->pos()是QPoint类型,event-&…...
服务器虚拟化的全面指南
1. 引言 在数字化转型的浪潮中,服务器虚拟化成为现代IT基础设施的核心组成部分。它通过将物理服务器资源分割成多个虚拟资源,极大地提高了资源利用率和灵活性。本篇文章将深入探讨服务器虚拟化的概念、优势、挑战、技术工具、最佳实践及未来发展趋势。 …...
Linux启动mysql报错
甲方公司意外停电,所有服务器重启后,发现部署在Linux上的mysql数据库启动失败.再加上老员工离职,新接手项目,对Linux系统了解不多,解决起来用时较多,特此记录。 1.启动及报错 1.1 启动语句1 启动语句1&a…...
基于大数据的二手房价数据可视化系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…...
手机怎样制作个人网站/如何弄一个自己的网站
将List类型转化为Json,是我们平常开发时最常见的了。在使用中,有很多种方法,也可以使用。 第一种 第三方组件:Newtonsoft.Json.dll //转化成Json Newtonsoft.Json.JsonConvert.SerializeObject(obj);//反序列化 Newtonsoft.Json.J…...
企业网站建设合同(一)/杭州网站seo
文章目录数据集格式基于线性回归sigmoid实现二分类的表达式链式法则求导链式表达式求解∂l∂oj^\frac{\partial l}{\partial \hat{o_j}}∂oj^∂l求解∂oj∂wij\frac{\partial o_j}{\partial w_{ij}}∂wij∂oj求解∂oj∂bj\frac{\partial o_j}{\partial b_j}∂bj∂…...
深圳英文网站设计/广州网站营销seo费用
存储基础知识三 文件系统 2009-10-09 17:32:58标签:存储 文件系统 [推送到技术圈] 一、概述文件系统定义了把文件存储于磁盘时所必须的数据结构及磁盘数据的管理方式。我们知道,磁盘是由很多个扇区(Sector)组成的ÿ…...
专做医药中间体的网站/青岛新闻最新今日头条
最近有客户想对数据库的用户权限做限制,需要用到copy的功能,不能确定其需要的权限,测试了一下: highgo# \c highgo testc highgo> select user;current_user --------------testc (1 row)highgo> copy testcopyto to /hgd…...
南宁 网站建设 制作/营销型网站建设策划书
本篇的话题,讨论Java类的加载、链接和初始化。Java字节代码的表现形式是字节数组(byte[]),而Java类在JVM中的表现形式是java.lang.Class类的对象。一个Java类从字节代码到能够在JVM中被使用,需要经过加载、链接和初始化…...
2017网站开发新技术/seo模拟点击软件
计划上:在之前的想了一下怎么设计 通过定义运算符优先级 分数的话将它转化成小数 然后通过不断简化算式来计算结果 实际操作上:在定义优先级的时候 将乘除定义的比加减多一个优先级 括号内的可以用栈来实现 搜索查到(就入栈直至检查到&#…...