当前位置: 首页 > news >正文

Linux自旋锁

面对没有获取锁的现场,通常有两种处理方式。

  • 互斥锁:堵塞自己,等待重新调度请求
  • 自旋锁:循环等待该锁是否已经释放

本文主要讲述自旋锁

自旋锁其实是一种很乐观的锁,他认为只要再等一下下锁便能释放,避免了操作系统进程调度和线程切换。

自旋锁演进

  1. 传统TAS自旋锁:只有0、1两张状态,线程无序抢锁,不公平
  2. ticket based自旋锁:状态表示为两个counter——owner、next,线程有序持有锁,公平持锁
  3. queued自旋锁:只有一个线程自旋在spinlock,其他线程在自己的per cpu mcs lock自旋

ticket spinlock

ticket spinlock通过当前服务号来确定当前获取锁的对象,每一个对象在获取锁时都会获取到自己的服务号,当当前服务号与自己服务号一致时,就说明获取到锁

这有些像去外面吃饭等餐,先去服务员那里取号,然后空出位置来,服务员就会叫下一个号

typedef struct {union {u32 slock;struct __raw_tickets {
#ifdef __ARMEB__u16 next;u16 owner;
#elseu16 owner;u16 next;
#endif} tickets;};
} arch_spinlock_t;

以下以ARMv6 体系结构的 ticket-based 自旋锁为例

static inline void arch_spin_lock(arch_spinlock_t *lock)
{// 临时变量,用于存储中间结果unsigned long tmp;// 新的锁值u32 newval;// 当前锁的值arch_spinlock_t lockval;// 预取指令,提前加载 lock->slock 到缓存,以减少锁操作的内存访问延迟prefetchw(&lock->slock);// 使用 ARM 汇编代码实现的原子操作:// •	ldrex %0, [%3]:加载并排他地读取 lock->slock 的值到 lockval。// •	add %1, %0, %4:将 lockval 增加一个票数,存储到 newval。// •	strex %2, %1, [%3]:尝试将 newval 写回 lock->slock,并将结果存储到 tmp。// •	teq %2, #0:检查 tmp 是否为 0,表示写回操作是否成功。// •	bne 1b:如果写回操作失败,跳回到 1 继续尝试。__asm__ __volatile__(
"1:	ldrex	%0, [%3]\n"
"	add	%1, %0, %4\n"
"	strex	%2, %1, [%3]\n"
"	teq	%2, #0\n"
"	bne	1b": "=&r" (lockval), "=&r" (newval), "=&r" (tmp): "r" (&lock->slock), "I" (1 << TICKET_SHIFT): "cc");// 检查当前票数是否为锁的所有者票数,确保锁按照票数顺序被获取。如果当前票数不是所有者票数,处理器将进入低功耗等待状态,直到锁的所有者票数更新为当前票数while (lockval.tickets.next != lockval.tickets.owner) {wfe();lockval.tickets.owner = READ_ONCE(lock->tickets.owner);}// 内存屏障,确保在获取锁后所有的内存操作在锁的获取之后发生。这是因为 ARMv6 CPU 假设内存是弱顺序的(weakly ordered),需要内存屏障来保证操作顺序smp_mb();
}// 解锁
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{smp_mb();lock->tickets.owner++;dsb_sev();
}

在旧的自旋锁实现中,所有争用一个锁的处理器都会争来争去,看谁能先得到它。现在他们整齐地排队等候,按到达的顺序抢锁。多线程的运行时间甚至减少了,最大延迟也减少了(更重要的是,使其成为确定性的)。Nick说,新实现有轻微的成本,但在当代处理器上的成本非常小,相对于缓存未命中的成本基本上为零——这是处理争用锁时常见的事件。x86的维护者显然认为,消除自旋锁不合适的争夺所带来的好处超过了这个小成本;其他人似乎不太可能不同意。

以下是go实现版本

type ticketLock struct {// 记录当前服务号nowServing uint32// 记录下一个服务号nextTicket uint32
}// NewTicketLock instantiates a ticket-lock.
func NewTicketLock() *ticketLock {return &ticketLock{}
}// Lock locks the ticket-lock.
func (t *ticketLock) Lock() {// 递增下一个服务号,并获取当前服务号myTicket := atomic.AddUint32(&t.nextTicket, 1) - 1// 自旋等待直到轮到自己for myTicket != atomic.LoadUint32(&t.nowServing) {runtime.Gosched()}
}// Unlock unlocks the ticket-lock.
func (t *ticketLock) Unlock() {// 递增当前服务号atomic.AddUint32(&t.nowServing, 1)
}

mcs spinlock

mcs是简单的自旋锁,具有公平性,每个CPU在尝试获取锁时会在本地变量上自旋,从而避免了test-and-set自旋锁实现中的昂贵缓存争用。

mcsNode问题在于空间占用大,这对于操作系统并不是好消息

#ifndef __LINUX_MCS_SPINLOCK_H
#define __LINUX_MCS_SPINLOCK_H#include <asm/mcs_spinlock.h>struct mcs_spinlock {// 指向下一个等待的节点struct mcs_spinlock *next;// 表示锁是否被获取。1已获取0未获取int locked; /* 1 if lock acquired */// 用于嵌套计数int count;  /* nesting count, see qspinlock.c */
};#ifndef arch_mcs_spin_lock_contended
// 使用smp_cond_load_acquire提供获取语义,确保后续操作在锁被获取后进行
#define arch_mcs_spin_lock_contended(l)					\
do {									\smp_cond_load_acquire(l, VAL);					\
} while (0)
#endif#ifndef arch_mcs_spin_unlock_contended// 使用smp_store_release提供内存屏障,确保临界区的所有操作在解锁前完成
#define arch_mcs_spin_unlock_contended(l)				\smp_store_release((l), 1)
#endifstatic inline
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{struct mcs_spinlock *prev;// 初始化节点,将locked设为0,next设为NULLnode->locked = 0;node->next   = NULL;// 使用xchg函数交换锁变量和当前节点,获取前驱节点。// 通过使用 xchg(),我们确保了在 xchg() 之前对 @node 的所有初始化存储操作是正确排序的,并且这些操作在全局范围内被其他处理器或线程正确地看到。此外,这个 xchg() 指令还提供了与锁相关的 ACQUIRE 排序,以确保在获取锁时正确的内存可见性。prev = xchg(lock, node);// 如果前驱节点为空,说明锁已被获取,直接返回if (likely(prev == NULL)) {return;}// 否则,将前驱节点的next指针指向当前节点,并等待锁持有者传递锁WRITE_ONCE(prev->next, node);// 在节点的 locked 变量上自旋,直到前一个持有锁的线程将其设置为解锁状态arch_mcs_spin_lock_contended(&node->locked);
}/** Releases the lock. The caller should pass in the corresponding node that* was used to acquire the lock.*/
static inline
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{// 获取当前节点的下一个节点指针struct mcs_spinlock *next = READ_ONCE(node->next);// 如果没有下一个节点,尝试将锁设为NULL(使用cmpxchg_release)if (likely(!next)) {// 如果成功,直接返回if (likely(cmpxchg_release(lock, node, NULL) == node))return;// 否则,等待下一个节点的next指针被设置while (!(next = READ_ONCE(node->next)))cpu_relax();}// 将锁传递给下一个等待节点arch_mcs_spin_unlock_contended(&next->locked);
}#endif /* __LINUX_MCS_SPINLOCK_H */

以下是尝试仿照得到的go实现

type mcsNode struct {next    *mcsNodewaiting uint32
}type mcsLock struct {tail *mcsNode
}func newMCSLock() *mcsLock {return &mcsLock{}
}func (m *mcsLock) Lock(node *mcsNode) {// 初始化 node 节点,将其 next 设为 nil,并将 waiting 设为 1,表示该节点正在等待node.next = nilnode.waiting = 1// 使用 atomic.SwapPointer 将锁的 tail 指向当前节点,并返回先前的 tail(即前一个等待锁的节点)predecessor := (*mcsNode)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&m.tail)), unsafe.Pointer(node)))// 如果先前没有其他等待节点(即 pred 为 nil),当前节点直接获得锁。if predecessor != nil {// 如果有前一个节点,将当前节点的 next 指向当前节点,并在 waiting 变量上自旋等待,直到前一个节点将 waiting 设为 0,表示锁已释放predecessor.next = nodefor atomic.LoadUint32(&node.waiting) == 1 {// 在等待期间,使用 runtime.Gosched() 函数让出当前线程,以便其他线程可以获得CPU时间片runtime.Gosched()}}
}func (m *mcsLock) Unlock(node *mcsNode) {// 如果当前节点的 next 为 nil,表示没有其他节点在等待锁,直接将 tail 设为 nil,释放锁if node.next == nil {if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&m.tail)), unsafe.Pointer(node), nil) {return}// 如果 CAS 操作失败,表示有其他节点在等待锁,等待其它节点将 waiting 设为 0for node.next == nil {runtime.Gosched()}}// 如果当前节点的 next 不为 nil,将 next 节点的 waiting 设为 0,表示锁已释放atomic.StoreUint32(&node.next.waiting, 0)
}

qspinlock

qspinlock基于MCS锁(队列锁),并结合锁状态,旨在提高多处理器系统中的性能,特别是在NUMA(非统一内存访问)系统中

MCS锁在NUMA系统中表现得接近最佳,因为每个CPU只关注自己的队列条目,极大减少了处理器之间的缓存行传递

其主要原理如下:

  1. 基于MCS锁的队列机制
    • 每个等待获取锁的CPU在一个每CPU结构数组中有一个专用的mcs_spinlock结构。
    • 当锁被占用时,CPU会将自己添加到一个队列中,并在自己的mcs_spinlock结构上自旋,减少对共享锁变量的访问,从而减少缓存行的争用。
  2. 32位锁字段设计
    • 锁字段被分为多个部分,包括一个整数计数器(类似于锁定字段)、一个两位索引字段(指示队列尾部的条目)、一个单个位的“pending”字段和一个整数字段(保存队列尾部的CPU编号)
  3. 优化策略
    • 当CPU是下一个获取锁的候选者时,它会直接在锁本身上自旋,而不是在其每CPU结构上自旋,减少缓存行的访问
    • 如果锁被占用但没有其他CPU在等待,CPU会设置“pending”位,不使用其mcs_spinlock结构,直到有第二个CPU加入队列
    • 为了防止锁在某个节点上过长时间停留,如果主队列在一定时间内(默认10毫秒)没有清空,整个次要队列将被提升到主队列的头部,强制锁转移到另一个节点。
typedef struct qspinlock {union {atomic_t val;/** By using the whole 2nd least significant byte for the* pending bit, we can allow better optimization of the lock* acquisition for the pending bit holder.*/
#ifdef __LITTLE_ENDIANstruct {// 持锁状态u8	locked;// 是否有pending线程。// 1表示有thread正自旋在spinlock上,0表示没有pending threadu8	pending;};struct {// 锁位和等待位u16	locked_pending;// 指向mcs node队列的尾部节点// 这个队列中的thread有两种状态:头部的节点对应的thread自旋在pending+locked域,其他节点自旋在其自己的mcs lock上。u16	tail;};
#elsestruct {u16	tail;u16	locked_pending;};struct {u8	reserved[2];u8	pending;u8	locked;};
#endif};
} arch_spinlock_t;

qspinlock由locked、pending和tail的三元组表示。常见状态组合有

  • 0、0、0: 代表初始状态
  • 0、0、1: 代表仅有一个thread持有锁,无等待及MCS队列
  • 0、1、1: 代表锁被持有且存在等待者
  • n、1、1: 代表锁被持有、存在等待者且存在一个MCS队列
  • *、1、1: 代表锁被持有、存在等待者且存在多个MCS队列

状态迁移图如下

在这里插入图片描述

流程

线程以T表示

  1. 初始锁状态为(0, 0, 0),T1直接获取锁成功

    (0, 0, 0) -> (0, 0, 1)

  2. T2获取锁,设置等待位,并等待锁释放

    (0, 0, 1) -> (0, 1, 1)

    在获取锁之后,将会重置等待位,并设置锁

    (0, 1, 1) -> (0, 0, 1)

  3. T3获取锁,由于锁被持有且存在等待,直接进入队列等待锁

  4. 由于队列仅有T3一个等待者,所以一直自旋等待锁非锁持有且等待位的情况出现,也就是锁被释放,且没有等待者

以下是代码详细解释

获取锁时先快速判断是否无任何竞争状态,然后直接加锁

/*** queued_spin_lock - acquire a queued spinlock* @lock: Pointer to queued spinlock structure*/
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{u32 val = 0;// 首先尝试直接获取锁,如果锁可用(lock->val 为 0),则获取锁并返回// 此处对应(0, 0, 0)->(0, 0, 1)if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))return;// 如果锁不可用,则进入慢路径,加入等待队列queued_spin_lock_slowpath(lock, val);
}

接着走入慢路径

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{struct mcs_spinlock *prev, *next, *node;u32 old, tail;int idx;// 检查系统配置和虚拟化情况// ... /** 0,1,0 -> 0,0,1*/// 如果仅有等待者,说明锁被释放,但等待着还未获取锁,则有限等待锁被获取if (val == _Q_PENDING_VAL) {int cnt = _Q_PENDING_LOOPS;val = atomic_cond_read_relaxed(&lock->val,(VAL != _Q_PENDING_VAL) || !cnt--);}// 若除锁状态位以外的其他位被设置,说明有队列等待,或者存在等待位,则直接队列等待if (val & ~_Q_LOCKED_MASK)goto queue;/** trylock || pending** 0,0,* -> 0,1,* -> 0,0,1 pending, trylock*/// 设置等待位val = queued_fetch_set_pending_acquire(lock);// 再次检测竞争状态if (unlikely(val & ~_Q_LOCKED_MASK)) {/* Undo PENDING if we set it. */if (!(val & _Q_PENDING_MASK))clear_pending(lock);goto queue;}/** 0,1,1 -> 0,1,0*/// 处于仅持有状态,说明可以直接去等待持有锁了if (val & _Q_LOCKED_MASK)atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));/** 0,1,0 -> 0,0,1*/// 获取锁。清除pending位,设置lockedclear_pending_set_locked(lock);lockevent_inc(lock_pending);return;queue:lockevent_inc(lock_slowpath);
pv_queue:// 初始化MCS节点// 获取当前CPU上的MCS节点node = this_cpu_ptr(&qnodes[0].mcs);// 增加MCS节点计数器idx = node->count++;// 编码尾指针,包含当前 CPU 的 ID 和 MCS 节点索引tail = encode_tail(smp_processor_id(), idx);// 检查MCS节点数量。如果当前 CPU 的 MCS 节点数量超过最大值 MAX_NODES,则进入自旋锁的等待循环if (unlikely(idx >= MAX_NODES)) {lockevent_inc(lock_no_node);while (!queued_spin_trylock(lock))cpu_relax();goto release;}// 获取MCS节点node = grab_mcs_node(node, idx);/** Keep counts of non-zero index values:*/lockevent_cond_inc(lock_use_node2 + idx - 1, idx);barrier();// 初始化node->locked = 0;node->next = NULL;pv_init_node(node);// 再次尝试直接获取锁if (queued_spin_trylock(lock))goto release;smp_wmb();/*** p,*,* -> n,*,**/// 设置节点到锁尾位中old = xchg_tail(lock, tail);next = NULL;// 如果MCS节点不为空,则获取上一个节点,将当前节点加入等待队列,直到当前节点解锁if (old & _Q_TAIL_MASK) {prev = decode_tail(old);/* Link @node into the waitqueue. */WRITE_ONCE(prev->next, node);pv_wait_node(node, prev);arch_mcs_spin_lock_contended(&node->locked);// 在等待 MCS 锁的时候,尝试提前加载下一个指针并预取它的缓存行,以减少即将进行的 MCS 解锁操作的延迟next = READ_ONCE(node->next);if (next)prefetchw(next);}// 与Paravirtualization有关,暂忽略// ... // 等待锁释放且无等待位val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));locked:// 如果等待者就是当前节点,那么直接重置锁为仅持有锁// (n, 0, 1) -> (0, 0, 1)if ((val & _Q_TAIL_MASK) == tail) {if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))goto release; /* No contention */}// 若MCS队列中还有其他节点,则设置锁位// (n, 0, 0) -> (n, 0, 1)set_locked(lock);// 解除下一个节点的自旋if (!next)next = smp_cond_load_relaxed(&node->next, (VAL));arch_mcs_spin_unlock_contended(&next->locked);pv_kick_node(lock, next);release:// 释放当前cpu的MCS节点// 也就是MCS节点数量递减__this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);/** We must be able to distinguish between no-tail and the tail at 0:0,* therefore increment the cpu number by one.*/static inline __pure u32 encode_tail(int cpu, int idx)
{u32 tail;tail  = (cpu + 1) << _Q_TAIL_CPU_OFFSET;tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */return tail;
}static inline __pure struct mcs_spinlock *decode_tail(u32 tail)
{int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;int idx = (tail &  _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;return per_cpu_ptr(&qnodes[idx].mcs, cpu);
}/*** set_locked - Set the lock bit and own the lock* @lock: Pointer to queued spinlock structure** *,*,0 -> *,0,1*/
static __always_inline void set_locked(struct qspinlock *lock)
{WRITE_ONCE(lock->locked, _Q_LOCKED_VAL);
}/** xchg_tail - Put in the new queue tail code word & retrieve previous one* @lock : Pointer to queued spinlock structure* @tail : The new queue tail code word* Return: The previous queue tail code word** xchg(lock, tail), which heads an address dependency** p,*,* -> n,*,* ; prev = xchg(lock, node)*/
static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
{/** We can use relaxed semantics since the caller ensures that the* MCS node is properly initialized before updating the tail.*/return (u32)xchg_relaxed(&lock->tail,tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
}

解锁就很简单了,直接设置锁位为0就可以了

static inline void queued_spin_unlock(struct qspinlock *lock)
{// Release the lockatomic_set(&lock->val, 0);
}

以下是仿照qspinlock思想所做出的go版本实现

import ("runtime""sync/atomic"
)const (// LOCKED 表示锁被持有LOCKED uint32 = 1 << iota// PENDING 表示锁被持有,但有等待者PENDING// TAIL 表示等待者MCS节点尾部TAIL// LOCKED_PENDING_MASK 表示锁的状态LOCKED_PENDING_MASK = LOCKED | PENDING// PENDING_CLEAR_MASK 表示清除等待标记PENDING_CLEAR_MASK uint32 = ^PENDING// LOCKED_CLEAR_MASK 表示清除锁标记LOCKED_CLEAR_MASK uint32 = ^LOCKED// TAIL_OFFSET 表示尾部索引偏移TAIL_OFFSET = 2// TAIL_MASK 表示尾部索引掩码,用于检查尾部索引是否存在TAIL_MASK uint32 = 0xFFFFFFFC// MCS_UNLOCKED MCS解锁状态MCS_UNLOCKED uint32 = 1// COUNT_ADD_MASK 计数掩码COUNT_ADD_MASK uint64 = 1<<32 | 1// COUNT_INDEX_MASK 计数索引掩码COUNT_INDEX_MASK uint64 = 0xFFFFFFFF// COUNT_SUB_MASK 计数减法掩码COUNT_SUB_MASK uint64 = 0xFFFFFFFF00000000
)type qspinlock struct {val   uint32count uint64
}type qmcsNode struct {locked   uint32next     *qmcsNodereleased bool
}const (maxMcsNodeCount = 16maxTrySetCount  = 1000
)var (qnodes = make([]*qmcsNode, maxMcsNodeCount)
)func init() {for i := 0; i < maxMcsNodeCount; i++ {qnodes[i] = &qmcsNode{}}
}// atomicCondReadAcquire 自旋直到 addr 的值满足 condition
func atomicCondReadAcquire(addr *uint32, condition func(uint32) bool) uint32 {var val uint32for {val = atomic.LoadUint32(addr)if condition(val) {break}runtime.Gosched() // Yield the processor}return val
}// clearPendingSetLocked 清除等待标记,设置锁标记
func clearPendingSetLocked(lock *qspinlock) {for {// 读取当前值old := atomic.LoadUint32(&lock.val)// 如果锁已经被持有,这是不应该出现的情况if old&LOCKED != 0 {panic("clearPendingSetLocked: Lock By Others")}// 计算新值,清除等待标记,设置锁标记val := old & PENDING_CLEAR_MASKval |= LOCKED// 尝试更新变量值if atomic.CompareAndSwapUint32(&lock.val, old, val) {break}// 如果更新失败,继续尝试}
}// trySetPending 尝试设置等待标记
func trySetPending(lock *qspinlock) bool {var old uint32loopCount := maxTrySetCountfor loopCount > 0 {// 若已经被设置等待标记,则说明等待位已经被其他线程设置,直接返回old = atomic.LoadUint32(&lock.val)if old&PENDING != 0 {return false}// 尝试设置等待标记if atomic.CompareAndSwapUint32(&lock.val, old, old|PENDING) {return true}loopCount--}return false
}// setLocked 设置锁标记
func setLocked(lock *qspinlock) {for {// 读取当前值old := atomic.LoadUint32(&lock.val)// 设置锁标记val := old | LOCKED// 如果锁已经被持有,则退出if val == old {panic("setLocked: Lock By Others")}// 尝试更新变量值if atomic.CompareAndSwapUint32(&lock.val, old, val) {break}// 如果更新失败,继续尝试}
}// xchgTail 将尾部索引设置到锁变量中
func xchgTail(lock *qspinlock, tail uint32) uint32 {var old uint32for {// 读取当前值old = atomic.LoadUint32(&lock.val)// 计算新值,将尾部索引设置到变量中val := (old & LOCKED_PENDING_MASK) | tail// 尝试更新变量值if atomic.CompareAndSwapUint32(&lock.val, old, val) {break}// 如果更新失败,继续尝试}return old
}// queuedSpinTryLock 快速尝试获取锁
func queuedSpinTryLock(lock *qspinlock) bool {return atomic.CompareAndSwapUint32(&lock.val, 0, LOCKED)
}// decodeTail 解析尾部索引
func decodeTail(tail uint32) *qmcsNode {idx := tail >> TAIL_OFFSETreturn qnodes[idx-1]
}// encodeTail 编码尾部索引。索引从1开始
func encodeTail(idx uint32) uint32 {return (idx + 1) << TAIL_OFFSET
}// 锁计数以及索引递增
// Attention! 索引递增上限度为1<<32-1
func increaseLockCount(count *uint64) (uint32, uint32) {x := atomic.AddUint64(count, COUNT_ADD_MASK)return uint32((x-1)&COUNT_INDEX_MASK) & (maxMcsNodeCount - 1), uint32(x >> 32)
}// 锁计数递减
func decreaseLockCount(count *uint64) {atomic.AddUint64(count, COUNT_SUB_MASK)
}func (l *qspinlock) Lock() {val := atomic.LoadUint32(&l.val)if val == 0 && queuedSpinTryLock(l) {return}queuedSpinLockSlowPath(l, val)
}func queuedSpinLockSlowPath(lock *qspinlock, val uint32) {var old, tail uint32var node, next *qmcsNodevar success bool// b1 若仅有等待位,则等待非仅有等待位// b2 若仅有加锁位,则进入a逻辑if val == PENDING {val = atomicCondReadAcquire(&lock.val, func(v uint32) bool {return v != PENDING})}if val != LOCKED {goto queue}// a1 若原来仅有加锁位,则尝试设置等待位// a2 等待位设置成功后,等待获取锁,并设置锁位// a3 若等待位设置失败,则队列等待锁success = trySetPending(lock)if success {atomicCondReadAcquire(&lock.val, func(v uint32) bool {return v&LOCKED == 0})clearPendingSetLocked(lock)return}// 队列等待锁// c1. 申请一个MCS节点// c2. 将MCS节点尾部索引设置到锁中// c3. 若设置成功前的MCS节点已经存在,则获取上一个节点,添加到队列中,并等待上一个节点的解锁(主动解锁,或者不存在)// c4. 等待无加锁等待位// c5. 若当前尾节点就是当前节点,则尝试重置为仅有锁位// c6. 若当前尾节点不是当前节点,则获取下一个节点并解锁// c7. 设置锁位
queue:idx, count := increaseLockCount(&lock.count)if count > maxMcsNodeCount {for !queuedSpinTryLock(lock) {runtime.Gosched()}goto release}node = qnodes[idx]node.locked = 0node.next = nilnode.released = falsetail = encodeTail(idx)old = xchgTail(lock, tail)if old&TAIL_MASK != 0 {prev := decodeTail(old)if !prev.released {prev.next = nodeatomicCondReadAcquire(&node.locked, func(v uint32) bool {return v == MCS_UNLOCKED || prev.released})}}val = atomicCondReadAcquire(&lock.val, func(v uint32) bool {return v&LOCKED_PENDING_MASK == 0})if tail == val&TAIL_MASK {if atomic.CompareAndSwapUint32(&lock.val, val, LOCKED) {goto release}}next = node.nextnode.released = trueif next != nil {atomic.StoreUint32(&next.locked, MCS_UNLOCKED)}setLocked(lock)release:decreaseLockCount(&lock.count)
}func (l *qspinlock) Unlock() {for {// 读取当前值old := atomic.LoadUint32(&l.val)// 如果锁未被持有,继续if old&LOCKED == 0 {runtime.Gosched()continue}// 计算新值,清除锁标记val := old & LOCKED_CLEAR_MASK// 尝试更新变量值if atomic.CompareAndSwapUint32(&l.val, old, val) {break}// 如果更新失败,继续尝试}
}

Ref

  1. https://lwn.net/Articles/267968/
  2. https://lwn.net/Articles/852138/
  3. http://www.wowotech.net/kernel_synchronization/queued_spinlock.html
  4. https://elixir.bootlin.com/linux/v5.5-rc2/source

相关文章:

Linux自旋锁

面对没有获取锁的现场&#xff0c;通常有两种处理方式。 互斥锁&#xff1a;堵塞自己&#xff0c;等待重新调度请求自旋锁&#xff1a;循环等待该锁是否已经释放 本文主要讲述自旋锁 自旋锁其实是一种很乐观的锁&#xff0c;他认为只要再等一下下锁便能释放&#xff0c;避免…...

服务器----阿里云服务器重启或关机,远程连接进不去,个人博客无法打开

问题描述 在使用阿里云免费的新加坡服务器时&#xff0c;发现重启或者是关机在开服务器后&#xff0c;就会出现远程连接不上、个人博客访问不了等问题 解决方法 进入救援模式连接主机&#xff0c;用户名是root&#xff0c;密码是自己设置的 点击访问博客查看更多内容...

go 定时任务

在 Go 语言中&#xff0c;可以使用内置的 time 包来实现定时任务。以下是一个简单的示例&#xff1a; go package main import ( "fmt" "time" ) func main() { timer : time.NewTimer(2 * time.Second) <-timer.C fmt.Println(…...

Java Character 类

Java Character 类 Character 类用于对单个字符进行操作。 Character 类在对象中包装一个基本类型 char 的值 char ch a;// Unicode 字符表示形式char uniChar \u039A; // 字符数组char[] charArray { a, b, c, d, e };然而&#xff0c;在实际开发过程中&#xff0c;我们经…...

MQTT协议应用场景

MQTT协议的应用场景非常丰富&#xff0c;特别是在物联网领域。以下是对MQTT协议应用场景的清晰归纳&#xff1a; 1.物联网设备控制和监控&#xff1a;MQTT被广泛应用于物联网设备之间的通信&#xff0c;如智能家居、智能城市和工业自动化等领域。设备可以发布自身状态到特定主题…...

3.4.马氏链-随机游走的常返性

随机游走的常返态 1. 随机游走常返性定义1.1. 随机游走常返值和可能集1.2. 随机游走常返性2. 简单随机游走: 维数与常返性的关系2.1. 简单随机游走2.2. 二维及以下简单随机游走常返, 三维及以上简单随机游走非常返3. 随机游走 ( d ≤ 2 ) (d\leq 2) (d≤2): 常返的充分条件4. 随…...

HOT100与剑指Offer

文章目录 前言一、41. 缺失的第一个正数&#xff08;HOT100&#xff09;二、6. 从尾到头打印链表&#xff08;剑指Offer&#xff09;总结 前言 一个本硕双非的小菜鸡&#xff0c;备战24年秋招&#xff0c;计划刷完hot100和剑指Offer的刷题计划&#xff0c;加油&#xff01; 根…...

【AI开发】CRAG、Self-RAG、Adaptive-RAG

先放一张基础RAG的流程图 https://blog.langchain.dev/agentic-rag-with-langgraph/ 再放一个CRAG和self-RAG的LangChain官方博客 Corrective RAG(CRAG) 首先需要知道的是CRAG的特色发生在retrieval阶段的最后开始&#xff0c;即当我们获得到了近似的document&#xff08;或者…...

FFmpeg中内存分配和释放相关的源码:av_malloc函数、av_mallocz函数、av_free函数和av_freep函数分析

一、av_malloc函数分析 &#xff08;一&#xff09;av_malloc函数的声明 av_malloc函数的声明放在在FFmpeg源码&#xff08;本文演示用的FFmpeg源码版本为5.0.3&#xff0c;该ffmpeg在CentOS 7.5上通过10.2.1版本的gcc编译&#xff09;的头文件libavutil/mem.h中&#xff1a;…...

七天进阶elasticsearch[Four]

依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>...

数据库-数据定义和操纵-DDL语言的使用

创建一个数据库&#xff1a; create database 数据库名; 选择数据库&#xff1a; use 数据库名; 创建表 create table 表名( ); 添加字段&#xff1b; ALTER TABLE 表名 ADD 新字段名 数据类型 [约束条件] [FIRST|AFTER 已存在字段名] ; 删除字段&#xff1a; ALTER TABLE 表…...

黄金价格与美元的关系变了?

在一些传统的定价框架中&#xff0c;现货黄金的价格走势取&#xff0c;决于美元的实际利率水平——实际利率越高&#xff0c;黄金价格越低&#xff0c;反之亦然。在大多数的时候&#xff0c;美元的实际利率决定了美元指数的高低所以人们通常认为&#xff0c;现货金价与美元呈反…...

VB.net与C# 调用InitializeComponent的区别

VB.NET与C# 调用InitializeComponent的区别 在VB.NET和C#中&#xff0c;InitializeComponent 方法的调用方式有所不同。 C#: 在C#中&#xff0c;InitializeComponent 方法通常是在构造函数中显式调用的。它用于初始化窗体和控件的属性。代码示例如下&#xff1a; public pa…...

【数据结构与算法 刷题系列】求带环链表的入环节点(图文详解)

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《数据结构与算法 经典例题》C语言 期待您的关注 ​ 目录 一、问题描述 二、解题思路 方法一&#xff1a;数学公式推导法 方法…...

独立游戏之路:Tap篇 -- Unity 集成 TapTap 广告详细步骤

Unity 集成 TapADN 广告详细步骤 前言一、TapTap 广告介绍二、集成 TapTap 广告的步骤2.1 进入广告后台2.2 创建广告计划2.3 选择广告类型三、代码集成3.1 下载SDK3.2 工程配置3.3 源码分享四、常见问题4.1 有展现量没有预估收益 /eCPM 波动大?4.2 新建正式媒体找不到预约游戏…...

设计灵感源泉!7个令人赞叹的网页界面设计展示

网页的界面设计主要是指视觉设计和风格设计。高质量的界面更容易吸引用户的注意力&#xff0c;从而更准确地向用户传达信息。对于设计师来说&#xff0c;他们需要从高质量的作品中获得稳定的灵感&#xff0c;以帮助他们更高效地实现设计目标。在本文中&#xff0c;梳理了7个高质…...

vivado PIN

描述 引脚是基元或层次单元上的逻辑连接点。引脚允许 要抽象掉单元格的内容&#xff0c;并简化逻辑以便于使用。引脚可以 是标量的&#xff0c;包含单个连接&#xff0c;或者可以定义为对多个进行分组的总线引脚 信号在一起。 相关对象 引脚连接到一个单元&#xff0c;并且可以…...

docker部署mysql+nginx+redis

部署mysql 1、拉去镜像 docker search mysql docker pull mysql:5.7 2、运行镜像 docker run -p 3306:3306 --name mysql \ -v /home/mysql/log:/var/log/mysql \ -v /home/mysql/data:/var/lib/mysql \ -v /home/mysql/conf:/etc/mysql/conf.d \ -v /home/mysql/mysql-files…...

python文件操作、文件操作、读写文件、写模式

with读取文件数据内容 with open(filepath,mode,encoding) as file:#具体操作,例如&#xff1a;print(file.read())#查看文件所有的内容。 with&#xff1a;Python中的一个上下文管理器&#xff0c;用于简化资源的管理和释放。它可以用于任意需要进行资源分配和释放的情境…...

【亲测可用】docker进入正在运行的容器

微信公众号&#xff1a;leetcode_algos_life&#xff0c;代码随想随记 小红书&#xff1a;412408155 CSDN&#xff1a;https://blog.csdn.net/woai8339?typeblog &#xff0c;代码随想随记 GitHub: https://github.com/riverind 抖音【暂未开始&#xff0c;计划开始】&#xf…...

线程池吞掉异常的case:源码阅读与解决方法

1. 问题背景 有一天给同事CR&#xff0c;看到一段这样的代码 try {for (param : params) {//并发处理&#xff0c;func无返回值ThreadPool.submit(func(param));} } catch (Exception e) {log.info("func抛异常啦,参数是:{}", param) } 我&#xff1a;你这段代码是…...

基于mysqlbinlog恢复数据

1、把binlog转换为SQL mysqlbinlog --base64-outputdecode-rows -vv /usr/local/mysql/log-bin/mysql-bin.000003 >result.sql find / -name result.sql 2、查看events show binlog events in mysql-bin.000003; 3、回滚到3667那一行的数据 mysqlbinlog -v /usr/local/mys…...

Android_Android Studio 常用快捷键 for mac

功能快捷键运行ctrl R优化importctrl opt O格式化opt cmd L自动修正opt enter自动补齐cmd J自动生成代码cmd N搜索使用到的地方fn opt F7 ( cmd)搜索使用到的地方2shift cmd F搜索类cmd O当前文件搜索cmd F全局搜索按两下 shift搜索文件shift cmd O搜索符号opt…...

[EFI]NUC11电脑 Hackintosh 黑苹果efi引导文件

硬件型号驱动情况主板 英特尔 NUC11DBBi9&#xff08;LPC Controller WM590芯片组&#xff09; 处理器 11th Gen Intel Core i9-11900KB 3.30GHz 八核 已驱动内存32 GB ( 三星 DDR4 3200MHz 16GB x 2 )已驱动硬盘三星 MZVL21T0HCLR-00B00 (1024 GB / 固态硬盘)已驱动显卡AMD R…...

在Ubuntu上配置和设置防火墙UFW

在本文我们学习如何在Ubuntu上配置和设置UFW&#xff08;防火墙&#xff09;&#xff0c;UFW代表“不复杂的防火墙”&#xff0c;它充当IPTABLES的接口&#xff0c;从而简化了防火墙的配置过程&#xff0c;对于防火墙来说&#xff0c;这是非常困难的。初学者学习和配置防火墙规…...

nginx安装环境部署(完整步骤)

在部署nginx前&#xff0c;我们需要进行环境的部署 1.编译工具gcc&#xff0c;g,autoconf&#xff0c;automake &#xff0c;make sudo apt-get install gcc g autoconf automake make 2.依赖库zlib&#xff0c;openssl&#xff0c;pcre 2.1 openssl下载地址 https://www.open…...

如何做电子骑缝章?

制作电子骑缝章的过程可以依据不同情况和所使用的工具而有所不同&#xff0c;但基本思路是确保印章能够跨过页面接缝&#xff0c;以验证文档的完整性。以下是几种常见的方法&#xff1a; 使用专业电子合同平台 选择平台&#xff1a;首先&#xff0c;确保你使用的电子合同平台…...

2024.6.13 bailuo-Docker 安装与镜像拉取

2024.6.13 bailuo-Docker 安装与镜像拉取 2024.6.12 bailuo-安装与镜像拉取 卸载 Docker 如果已安装旧版 Docker 则先卸载 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-en…...

【Java开发规范】IDEA 设置 text file encoding 为 UTF-8,且文件的换行符使用 Unix 格式

1. IDEA 设置 text file encoding 为 UTF-8 file -> settings -> editor -> code style -> file encoding Transparent-native-to-asci conversion 要不要勾选&#xff1f;> 不推荐勾选&#xff08;它的作用是用来自动转换ASCII编码&#xff0c;防止文件乱码&am…...

使用`LD_PRELOAD`和`jemalloc`实现C/C++信号的内存堆栈信息收集

文章目录 0. 概要1. 编译jemalloc2. 编译钩子共享库liballoc_hook.so3. 使用LD_PRELOAD加载钩子库liballoc_hook.so测试3.1 设置环境变量3.2 使用LD_PRELOAD加载钩子库并运行程序3.3 发送SIGUSR1信号以触发堆栈信息打印3.4 使用jeprof解析heap堆栈信息文件 4. 示例程序example.…...

一个网站的建设流程有哪些资料/网站流量宝

2019独角兽企业重金招聘Python工程师标准>>> 1. lucene 简介 Apache Lucene 是一个全文检索引擎&#xff0c;它不是一个完整的应用程序&#xff0c;但它提交的API可以很容易的搭建一个具有检索能力的应用。DownLoad: http://www.apache.org/dyn/closer.lua/lucene…...

网站站点连接不安全/百度搜索引擎的特点

-- 十年一顾&#xff0c; iOS 与 Android 这样改变了我们- http://blog.csdn.net/csdnnews/article/details/78217466 进行图像、视频相关应用的开发&#xff0c;不仅仅关注普通应用层的开发技能&#xff0c;对图像视频领域的内容也有了更多的关注。最近从零学习移动端的 Op…...

帮客户做网站内容/北京seo优化厂家

双人3D坦克实现 player1: WSAD控制上下左右   空格键发射炮弹 player2: IKJL可控制上下左右  B键发射炮弹 每个坦克只有100hp,子弹击中1次扣30hp&#xff0c;hp时时显示在坦克上 当一辆坦克hp低于0时&#xff0c;游戏结束 Main Camera聚焦两辆坦克中心点    游戏项目已…...

学wordpress不需要学DW/seo项目

为了测试其中的 PROVISION 接口&#xff0c;利用了 System.Net.HttpWebRequest 类将《MISC系统短信SP接入指南&#xff0d;接口改造分册》文档中的示例 xml 发送到了 WEB 服务&#xff0c;并从 WEB 服务返回了对应的 Resp 包&#xff08;也是一段 xml&#xff09;&#xff0c;下…...

预约营销型网站建设专家/成人短期就业培训班

敢在今年提裸辞的人&#xff0c;都是勇士。 这是最近笔者在就业话题下看到得最多的一句话。几年前&#xff0c;逃离北上广深的话题火了&#xff0c;也随之而来带起了一股裸辞的风潮。“世界这么大&#xff0c;我想去看看。”成为了年轻人追求自由的目标。 而今年&#xff0c;…...

可以做网络攻防的实验的网站/品牌推广的具体方法

Microsoft Office Visio绘画双箭头直线的具体步骤介绍作者:小鑫 来源:PC下载网时间:2019-07-19 15:27:03你们的工作中是不是也离不开Microsoft Office Visio呢?不过你们了解Microsoft Office Visio是怎么绘画双箭头直线吗?以下文章就为你们带来了Microsoft Office Visio绘画双…...