11-pg内核之锁管理器(六)死锁检测
概念
每个事务都在等待集合中的另一事务,由于这个集合是一个有限集合,因此一旦在这个等待的链条上产生了环,就会产生死锁。自旋锁和轻量锁属于系统锁,他们目前没有死锁检测机制,只能靠内核开发人员在开发过程中谨慎的使用锁,避免死锁发生。
数据库中的常规锁,对申请锁的顺序没有严格的限制,在申请锁时也没有严格的查验,因此不可避免的就会产生死锁。pg数据库采用一种全局死锁检测的方法就是,每个进程启动后会启动一个定时器,定时调用死锁检测函数,检测是否有死锁产生,如果有死锁则进行相应的处理来解除死锁。
实边
在常规锁的申请过程中,假设A事务持有表的共享锁或排它锁,当时事务B申请表的排他锁时,就需要进入等待状态,即有等待状态B->A,我们称这种等待边为实边。它主要出现在等待者和持锁者之间。
虚边
假设事务A持有共享锁,事务B要申请排他锁,因为与事务A冲突,那么事务B就需要进入等待队列,如果此时又有事务C要申请共享锁,虽然事务C与事务A并不冲突,但是事务C与等待队列中的事务B要持有的排他锁冲突,所以事务C也要进入等待队列中,此时的冲突关系C->B ,我们称他们之间等待的边为虚边。它主要出现在等待队列的等待者之间的关系。
环
在数据库中,假设事务A需要等待事务B释放锁后才能执行,这样的等待关系我们称之为边,边是有向的。实边和虚边是边的两种特殊情况。假设一个锁的等待队列中的所有的事务,他们之间的等待关系都用边来表示,这样构成关系图实际上就是一种有向图。如果有向图中出现了环,我们就可以判断出现了死锁。如果环的每个边都是实边,那么就是出现了实边死锁,实边死锁只能通过杀掉其中一个进程来断开环从而解锁。 如果环中存在虚边,那么出现的就是虚边死锁,就可以通过调整等待队列的顺序来尝试断开环,从而解锁。
- 假设有A、B、C三个事务,有Lock1、Lock2两把锁,其中事务A等待Lock1的排它锁;事务B持有Lock1的共享锁,等待Lock2的共享锁;事务C持有Lock 2的排它锁,等待Lock1的共享锁。
- 对于Lock1来说,事务B持有其共享锁,事务A等待Lock1的排它锁,与事务B形成实边,事务C等待Lock1的共享锁,与事务B并不冲突,但是与事务A冲突,所以与事务A形成虚边。
- 对于Lock2来说,事务C持有Lock2的排它锁,事务B在等待Lock2的共享锁,与事务C冲突,形成实边。
- 最终得到的等待关系图,如上图,形成了一个包含虚边的环,即虚边死锁;即事务A在等待事务B释放Lock1,事务B在等待事务C释放Lock2,而事务C在等待事务A释放Lock1,从而形成了死锁
- 要解除上面的虚边死锁,可以对虚边进行拓扑排序,调整虚边的等待关系,即将Lock1等待队列上的C–>A,改成A–>C,这样得到的等待关系图,就不存在环了,死锁就不存在了。事务A在等待事务C释放Lock1,事务B在等待事务C释放Lock2,事务C可以直接获取Lock1的共享锁,执行完之后就可以释放Lock2的排他锁,释放后,事务A获取Lock1锁,事务B获取Lock2锁,都不再阻塞。
拓扑排序
当存在虚边构成的环时,会通过重排等待队列的方式尝试断开环从而解锁。重排的方式就是通过拓扑排序实现。
拓扑排序就是在一个有向无环图(DAG),将所有的点 排成一个线性的序列,使得每条有向边的起点都排在终点的前面。
拓扑排序遵循的原理:
1) 在图中选择一个没有前驱的定点V
2) 从图中删除顶点V和所有以该顶点为尾的弧。
如下图:
- 找到没有前驱的定点V1
- 删除V1及以V1作为起点的边
- 继续查找没有前驱的顶点,此时V2和V3都符合要求,随机选择一个,这里选择V2
- 删除V2和以V2作为起点的边
- 继续查找没有前驱的顶点,V3符合,选择V3
- 删除V3以及以V3作为起点的边
- 剩余V4,排序结束。
最终得到的拓扑排序结果就有两种:
V1->V2->V3->V4
V1->V3->V2->V4
死锁检测函数中调用TopoSort函数实现对包含虚边的环的拓扑排序。
死锁检测相关的结构体和全局变量
结构体
EDGE
等待关系图中的一条边。
等待者(waiter)和阻塞者(blocker)可能是锁组的成员,也可能不是,但如果它们中任何一个属于锁组,那它将是锁组的领导者而非锁组中的其他成员。即便这些特定进程根本无需等待,锁组的领导者也充当整个组的代表。等待者的锁组中至少有一个成员在给定锁的等待队列上,甚至可能更多。
typedef struct
{PGPROC *waiter; /* 等待者*/PGPROC *blocker; /* 被等待者or */LOCK *lock; /* 等待的锁*/int pred; /* 拓扑排序使用的额外变量 */int link; /* 拓扑排序使用的额外变量*/
} EDGE;
WAIT_ORDER
等待队列,如果死锁检测处有虚边死锁,则会尝试通过调整等待队列来尝试消除死锁,调整时新的等待队列就保存到waitOrders数组中,数组中每个元素就是一个等待队列,由WAIT_ORDER结构体保存其相关信息。
typedef struct
{LOCK *lock; /* 等待的锁 */PGPROC **procs; /* 在lock上的新的等待队列 */int nProcs; /* 等待队列的长度 */
} WAIT_ORDER;
DEADLOCK_INFO
死锁相关的信息
typedef struct
{LOCKTAG locktag; /* 死锁的锁tag信息*/LOCKMODE lockmode; /* 等待的锁的模式*/int pid; /* 阻塞的进程号*/
} DEADLOCK_INFO;
全局变量
- got_deadlock_timeout: 全局死锁检测标志位,为true时触发死锁死锁检测。
- nCurConstraints: 当前检测到的边的数量
- curConstraints: 当前已被检测的虚边的信息,数组中保存,拓扑排序时就对这个数组进行排序。
- maxCurConstraints: 允许的最大的边数量
- nPossibleConstraints: 可能的边的数量
- possibleConstraints: 可以被调整的虚边的信息,数组中保存
- nWaitOrders: 新的等待队列的数量
- waitOrders: 新的等待队列的信息,在查找环的过程中,会将对应的等待边放到该队列中,方便进行重新排列。大小是进程数的1/2,因为一个边就有2个等待进程。
- blocking_autovacuum_proc: 被vacuum阻塞的进程
- nVisitedProcs: 访问到的进程的数量
- visitedProcs: 访问到的进程信息,数组中保存,通过该数组判断是否存在环,比如如果一个进程在数组中重复出现则就构成了环。
- nDeadlockDetails: 死锁详细信息的数量
- deadlockDetails: 死锁详细信息,数组中保存
- beforeConstraints:记录每个进程需要在多少其他进程之前
- afterConstraints:则间接通过链表头记录每个进程需要在哪些进程之后。
死锁检查流程及相关函数
注册死锁检测定时器
在每个进程启动时,会注册一个死锁检测定时器,回调函数为CheckDeadLockAlert,当DEADLOCK_TIMEOUT时间超时(默认是1秒)时,就会调用CheckDeadLockAlert函数,该函数内会将全局变量got_deadlock_timeout设为true.
if (!bootstrap){RegisterTimeout(DEADLOCK_TIMEOUT, CheckDeadLockAlert);RegisterTimeout(STATEMENT_TIMEOUT, StatementTimeoutHandler);RegisterTimeout(LOCK_TIMEOUT, LockTimeoutHandler);RegisterTimeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,IdleInTransactionSessionTimeoutHandler);RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler);RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler);}
进程在等待锁时会调用WaitOnLock函数等待,该函数又回调用procsleep函数,它里面就会根据该变量判断是否需要进行死锁检测。
if (got_deadlock_timeout){CheckDeadLock();got_deadlock_timeout = false;}
InitDeadLockChecking
每个backend进程启动后调用,初始化死锁检测相关的全局变量, maxBackend为进程的最大数量(max_connections),其中死锁检测相关的全局变量初始化的大小为:
MaxBackends = MaxConnections + autovacuum_max_workers + 1 +
max_worker_processes + max_wal_senders = 100 + 3 + 1 + 8 + 10 = 122(默认值)
全部变量名 | 长度 |
---|---|
visitedProcs | MaxBackends |
deadlockDetails | MaxBackends |
topoProcs | MaxBackends |
beforeConstraints | MaxBackends |
afterConstraints | MaxBackends |
waitOrders | MaxBackends / 2 |
waitOrderProcs | MaxBackends |
maxCurConstraints | MaxBackends |
curConstraints | MaxBackends |
maxPossibleConstraints | MaxBackends * 4 |
possibleConstraints | MaxBackends * 4 |
visitedProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));//访问过的进程数组,最大为maxbankenddeadlockDetails = (DEADLOCK_INFO *) palloc(MaxBackends * sizeof(DEADLOCK_INFO));//死锁详细信息,最大为maxBackends/*拓扑排序用*/topoProcs = visitedProcs; /* re-use this space */beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));afterConstraints = (int *) palloc(MaxBackends * sizeof(int));waitOrders = (WAIT_ORDER *)palloc((MaxBackends / 2) * sizeof(WAIT_ORDER));//等待队列数组,长度为MaxBackends/2waitOrderProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));//等待队列进程,最大为MaxbackendsmaxCurConstraints = MaxBackends;curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));//探索的边的信息,最大为MaxBackends,设置过大会导致嵌套深度过大导致堆栈溢出/** 允许最多保存3*MaxBackends个约束而无需重新运行TestConfiguration。(这可能已经绰绰有余,但即使空间不足,我们也可以通过每次需要时重新运行TestConfiguration来重新计算约束列表以应对。)possibleConstraints[]中的最后MaxBackends个条目被预留作为FindLockCycle的输出工作区。*/maxPossibleConstraints = MaxBackends * 4;possibleConstraints =(EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
CheckDeadLock
这是死锁检测的入口函数,死锁检测操作就是由该函数实现。由于死锁检测是互斥的,所以死锁检测期间锁表不允许被修改。但是如果一个事务只是通过本地锁表或通过FastPath就能获得锁,则它不受死锁检测的影响。
- 以排他模式锁住主锁表
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);//以排他模式锁住主锁表
- 调用DeadLockCheck函数检测死锁
deadlock_state = DeadLockCheck(MyProc);//检测死锁
- 如果产生了实边死锁,将当前进程从等待队列中删除
if (deadlock_state == DS_HARD_DEADLOCK)//产生实边死锁{RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)));//将当前进程从等待队列中删除}
- 释放主锁表
for (i = NUM_LOCK_PARTITIONS; --i >= 0;)LWLockRelease(LockHashPartitionLockByIndex(i));//释放所有的锁
DeadLockCheck
检查给定的进程上是否产生了死锁,如果是虚边死锁,会通过调整等待队列顺序来尝试解决死锁,如果无法解决,就会返回DS_HARD_DEADLOCK,死锁的详细信息会保存到deadlockDetails[]中。
- 死锁检测的起点,先初始化全局变量 //死锁检测开始位置,初始化边相关的全局变量如:nCurConstraints,nPossibleConstraints ,nWaitOrders,blocking_autovacuum_proc
//死锁检测开始位置,初始化边相关的全局变量nCurConstraints = 0;nPossibleConstraints = 0;nWaitOrders = 0;/* Initialize to not blocked by an autovacuum worker */blocking_autovacuum_proc = NULL;
- 调用DeadLockCheckRecurse函数查找环,如果找到实边死锁,直接诶返回死锁
//查找死锁(环)if (DeadLockCheckRecurse(proc)){int nSoftEdges;TRACE_POSTGRESQL_DEADLOCK_FOUND();nWaitOrders = 0;if (!FindLockCycle(proc, possibleConstraints, &nSoftEdges))//再次检查一下死锁是否消失,若消失表名有错elog(FATAL, "deadlock seems to have disappeared");return DS_HARD_DEADLOCK; /* 发现实边死锁*/}
- 遍历每个等待队列,将对应锁的等待进程重新添加到锁的等待队列中,并尝试唤醒一些可唤醒的进程
for (i = 0; i < nWaitOrders; i++)//遍历每个等待队列{LOCK *lock = waitOrders[i].lock;PGPROC **procs = waitOrders[i].procs;int nProcs = waitOrders[i].nProcs;PROC_QUEUE *waitQueue = &(lock->waitProcs);ProcQueueInit(waitQueue);//初始化等待队列for (j = 0; j < nProcs; j++){SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));//头插法插入到队列中waitQueue->size++;}ProcLockWakeup(GetLocksMethodTable(lock), lock);//查看是否可以唤醒一些进程}
- 如果nWaitOrders不等于0,表明还有虚边死锁,返回DS_SOFT_DEADLOCK
- 如果是被vacuum进程阻塞住,返回DS_BLOCKED_BY_AUTOVACUUM
- 其他情况表明无死锁,返回DS_NO_DEADLOCK
if (nWaitOrders > 0)//有虚边死锁return DS_SOFT_DEADLOCK;else if (blocking_autovacuum_proc != NULL)//被vacuum阻塞return DS_BLOCKED_BY_AUTOVACUUM;elsereturn DS_NO_DEADLOCK;//无死锁
DeadLockCheckRecurse
DeadLockCheckRecurse
函数是一个递归过程,旨在深入探索并找出有效的执行顺序以避免死锁情况。该函数的主要目的是通过递归的方式检测系统中的死锁状况,并尝试找出一个无死锁的执行顺序。它在多进程或多线程环境中特别有用,尤其是在涉及到资源共享和锁机制的情况下。
-
参数说明:
curConstraints[]
:这是一个数组,用于保存当前递归层级正在探索的边。随着递归的深入,每发现一个新的循环(即潜在的死锁条件),就会将相应的边添加到这个数组中。waitOrders[]
:这个数组用于记录需要调整的锁等待队列,以达到一个无死锁的状态。如果存在需要调整的队列,则通过这个数组指示出来。
-
返回值:
- 如果函数返回
true
,这意味着经过当前递归层次的探索,发现无法找到任何解决方案来避免死锁,即系统处于或即将进入死锁状态。 - 如果返回
false
,则意味着已经找到了一种无死锁的执行顺序或调整策略,使得所有进程或线程可以在不发生死锁的情况下继续执行。此时,waitOrders[]
中会包含如何重新排列锁等待队列的具体指导,以实现这一目标。
- 如果函数返回
-
工作原理:
- 该函数通过递归地检查当前的资源分配和锁等待关系,识别出所有可能形成环(即死锁的前提条件)的情况。
- 对于每一个识别到的环,函数尝试添加或调整虚边(即改变某些进程的等待顺序或优先级),以打破潜在的死锁链。
- 这个过程持续进行,直到所有可能的死锁情况都被探索完毕,或者找到了一个有效的无死锁执行方案。
执行流程如下
-
调用TestConfiguration检测当前的边的有效性,会将探测到的虚边保存到curConstraints数组,并返回探测到的虚边数量。
nEdges = TestConfiguration(proc);//测试边的有效性,返回探测到的虚边数量
- 如果返回的虚边数量小于0,表明有实边死锁
- 如果返回的虚边数量等于0,表明无死锁
- 如果当前探测到的nCurConstraints大于maxCurConstraints,表明超出存储限制了
if (nEdges < 0)//有实边死锁return true; /* hard deadlock --- no solution */if (nEdges == 0)//无死锁return false; /* good configuration found */if (nCurConstraints >= maxCurConstraints)//边数量超出限制return true; /* out of room for active constraints? */
- 判断PossibleConstraints中是否有空间,若无空间的话,就没必要保存探测到的虚边了
if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)//有 空间保存可能的边{nPossibleConstraints += nEdges;//其实边已经存在possibleConstraints地址后了,只不过nPossibleConstraints没更新就会忽略而已savedList = true;}else//无空间保存{savedList = false;}
- 将探测到的虚边保存到curConstraints,然后递归调用该函数,判断是否有死锁
curConstraints[nCurConstraints] = possibleConstraints[oldPossibleConstraints + i];//将探测到的边保存到curConstraintsnCurConstraints++;
if (!DeadLockCheckRecurse(proc))//重新检测是否有死锁return false; /* found a valid solution! */
TestConfiguration
测试当前配置的有效性。该函数首先会调用ExpandConstraints函数尝试对等待队列进行重排来尝试解除虚边死锁;然后继续查找是否存在环,如果存在就将虚边保存到possibleConstraints数组中。
- 定义查保存虚边的地址,即possibleConstraints数组
EDGE *softEdges = possibleConstraints + nPossibleConstraints;//探测到的虚边都在此地址存放
- 判断possibleConstraints数组是否还有空间
if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)return -1;
- 尝试调整等待队列
if (!ExpandConstraints(curConstraints, nCurConstraints))//尝试调整等待队列来解除虚边死锁return -1;
- 遍历每个探测到的边,尝试根据每个边的waiter和blocker进程查找环,并将找到的环中的虚边保存到softEdges中
for (i = 0; i < nCurConstraints; i++){if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))//查询等待者进程是否存在环{if (nSoftEdges == 0)return -1; /* hard deadlock detected */softFound = nSoftEdges;}if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))//查询被等待者进程是否存在环{if (nSoftEdges == 0)return -1; /* hard deadlock detected */softFound = nSoftEdges;}}
- 检测当前进程是否存在环。
if (FindLockCycle(startProc, softEdges, &nSoftEdges))//检测当前进程是否存在环 {if (nSoftEdges == 0)return -1; softFound = nSoftEdges;}
FindLockCycleRecurse
递归查找是否存在环,查找的原理就是:在查找环时,先检查待测进程是否在visitedProcs数组中出现过,如果没出现过,就将待测进程存入到visitedProcs数组中,如果出现过,而且是在等待队列的起始处,则表明出现了死锁的环,返回死锁信息。
例如:
-
待测进程在visitedProcs数组中未出现过,没有环,无死锁
-
待测进程在visitedProcs数组中出现过,存在环,但是不在起始处,对当前进程而言不算死锁
-
待测进程在visitedProcs数组中出现过,存在环,且在起始处,存在死锁
- 判断是否出现环
/*判断是否有环,遍历visitedProcs数组,如果检查的proc在数组中出现过,且是当前的进程,表明出现了环*/for (i = 0; i < nVisitedProcs; i++){if (visitedProcs[i] == checkProc)//进程重复出现{if (i == 0)//是待检测的进程,出现了环{nDeadlockDetails = depth;return true;}return false;}}
- 如果不存在环,将待测进程保存到visitedProcs数组中
visitedProcs[nVisitedProcs++] = checkProc;//没有检测到环,将检测进程存入visitedProcs数组
- 如果要检查的进程处于等待状态,那么就递归检测他的等待队列
if (checkProc->links.next != NULL && checkProc->waitLock != NULL &&FindLockCycleRecurseMember(checkProc, checkProc, depth, softEdges,nSoftEdges))return true;
- 如果待测进程是锁组中的一部分,遍历锁组的每个成员进程,检查是否存在环
如果进程没有等待,但是是锁组的一部分,还是有可能出现等待依赖边,尽管这个进程本身没有等待。*/dlist_foreach(iter, &checkProc->lockGroupMembers)//遍历每个group成员{PGPROC *memberProc;memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);if (memberProc->links.next != NULL && memberProc->waitLock != NULL &&memberProc != checkProc &&FindLockCycleRecurseMember(memberProc, checkProc, depth, softEdges,nSoftEdges))//递归检测是否有环return true;}
FindLockCycleRecurseMember
递归检查是否存在环。
- 获取锁的进程锁表,即锁模式冲突掩码
lockMethodTable = GetLocksMethodTable(lock);//获取锁方法numLockModes = lockMethodTable->numLockModes;//获取锁模式数量conflictMask = lockMethodTable->conflictTab[checkProc->waitLockMode];//获取与等待的锁模式冲突的掩码
- 遍历检查锁的等待队列的每个进程,如果待测进程等待的锁与当前持有的锁模式冲突,递归调用FindLockCycleRecurse函数检查是否存在环,如果存在返回实边死锁信息。
procLocks = &(lock->procLocks);//获取进程锁表proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,offsetof(PROCLOCK, lockLink));while (proclock)//遍历检查进程的等待队列的每个进程是否有死锁{PGPROC *leader;proc = proclock->tag.myProc;leader = proc->lockGroupLeader == NULL ? proc : proc->lockGroupLeader;if (leader != checkProcLeader)//同组的不检查{for (lm = 1; lm <= numLockModes; lm++)//遍历每一个锁模式{if ((proclock->holdMask & LOCKBIT_ON(lm)) &&(conflictMask & LOCKBIT_ON(lm)))//如果出现锁冲突,持有的锁与当前进程要等的锁模式冲突,实边{if (FindLockCycleRecurse(proc, depth + 1,softEdges, nSoftEdges))//递归检查{DEADLOCK_INFO *info = &deadlockDetails[depth];//有死锁,填充死锁相关信息info->locktag = lock->tag;info->lockmode = checkProc->waitLockMode;info->pid = checkProc->pid;return true;}if (checkProc == MyProc &&proc->statusFlags & PROC_IS_AUTOVACUUM)//没有死锁,但是判断是否有autovacuum进程阻塞我们blocking_autovacuum_proc = proc;break;}}}proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,offsetof(PROCLOCK, lockLink));}
- 找到当前锁的等待队列waitOrders[i],然后遍历等待队列的每个进程,如果遍历的进程等待的锁模式与待测进程锁模式冲突,递归调用FindLockCycleRecurse函数,检查是否存在环,如果存在,则为虚边死锁,将虚边保存到possibleConstraints数组中
for (i = 0; i < nWaitOrders; i++)//从等待队列中找到当前锁所在的等待队列{if (waitOrders[i].lock == lock)break;}if (i < nWaitOrders)//判断是否找到对应的等待队列{PGPROC **procs = waitOrders[i].procs;queue_size = waitOrders[i].nProcs;for (i = 0; i < queue_size; i++)//遍历等待队列中的每个进程{PGPROC *leader;proc = procs[i];leader = proc->lockGroupLeader == NULL ? proc :proc->lockGroupLeader;if (leader == checkProcLeader) break;if ((LOCKBIT_ON(proc->waitLockMode) & conflictMask) != 0)//等待的锁模式与待测进程的锁模式判断是否存在冲突{/* This proc soft-blocks checkProc */if (FindLockCycleRecurse(proc, depth + 1,softEdges, nSoftEdges))//递归检查是否存在虚边环{/* fill deadlockDetails[] */DEADLOCK_INFO *info = &deadlockDetails[depth];//记录虚边死锁信息info->locktag = lock->tag;info->lockmode = checkProc->waitLockMode;info->pid = checkProc->pid;/*即添加到possibleConstraints数组*/Assert(*nSoftEdges < MaxBackends);softEdges[*nSoftEdges].waiter = checkProcLeader;//保存虚边信息到即添加到possibleConstraints数组数组softEdges[*nSoftEdges].blocker = leader;softEdges[*nSoftEdges].lock = lock;(*nSoftEdges)++;return true;}}}}
- 如果waitOrders[i]中不存在当前锁的等待队列,找到该锁组的最后一个进程,检查他的等待队列的每个进程,如果遍历的进程等待的锁模式与待测进程锁模式冲突,调用FindLockCycleRecurse函数,检查是否存在环,如果存在,则为虚边死锁,将虚边保存到possibleConstraints数组中
else//等待队列中没找到当前进程{PGPROC *lastGroupMember = NULL;waitQueue = &(lock->waitProcs);//查找锁组的最后一个成员if (checkProc->lockGroupLeader == NULL)lastGroupMember = checkProc;else{proc = (PGPROC *) waitQueue->links.next;queue_size = waitQueue->size;while (queue_size-- > 0){if (proc->lockGroupLeader == checkProcLeader)lastGroupMember = proc;proc = (PGPROC *) proc->links.next;}}queue_size = waitQueue->size;proc = (PGPROC *) waitQueue->links.next;while (queue_size-- > 0)//遍历等待队列,查找虚边冲突{PGPROC *leader;leader = proc->lockGroupLeader == NULL ? proc :proc->lockGroupLeader;/* Done when we reach the target proc */if (proc == lastGroupMember)break;if ((LOCKBIT_ON(proc->waitLockMode) & conflictMask) != 0 &&leader != checkProcLeader)//锁冲突{if (FindLockCycleRecurse(proc, depth + 1,softEdges, nSoftEdges))//检查是否有虚边锁{DEADLOCK_INFO *info = &deadlockDetails[depth];//报错虚边死锁信息info->locktag = lock->tag;info->lockmode = checkProc->waitLockMode;info->pid = checkProc->pid;softEdges[*nSoftEdges].waiter = checkProcLeader;//保存虚边信息到即添加到possibleConstraints数组数组softEdges[*nSoftEdges].blocker = leader;softEdges[*nSoftEdges].lock = lock;(*nSoftEdges)++;return true;}}proc = (PGPROC *) proc->links.next;}}
- 无死锁
ExpandConstraints
将边CurConstraints扩展为对受影响等待队列的新排序
即将CurConstraints中的每个边加入到等待队列中,然后对等待队列进行拓扑排序
for (i = nConstraints; --i >= 0;)//遍历每个虚边死锁的边{LOCK *lock = constraints[i].lock;for (j = nWaitOrders; --j >= 0;)//确认等待队列中是否已经有了它{if (waitOrders[j].lock == lock)break;}if (j >= 0)//没遍历完,继续continue;//存入等待队列中waitOrders[nWaitOrders].lock = lock;waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;nWaitOrderProcs += lock->waitProcs.size;if (!TopoSort(lock, constraints, i + 1,waitOrders[nWaitOrders].procs))//进行拓扑排序return false;nWaitOrders++;}
TopoSort
当存在虚边环时,会对每个虚边调用该函数对等待队列进行重新排序,从而尝试解开虚边环。
- 将要处理的Lock的等待队列存入topoProcs数组中
proc = (PGPROC *) waitQueue->links.next;for (i = 0; i < queue_size; i++)//将锁的等待队列中的每个进程都保存到topoProcs数组中{topoProcs[i] = proc;proc = (PGPROC *) proc->links.next;}
- 初始化beforeConstraints和afterConstraints,这两个数组与topoProcs长度相等,下面要用
MemSet(beforeConstraints, 0, queue_size * sizeof(int));//初始化这俩变量,下面要用MemSet(afterConstraints, 0, queue_size * sizeof(int));
- 遍历每一个虚边
- 判断虚边的waiter或他的groupleader是否在等待队列中,如果在里面,根据其在等待队列中的位置将beforeConstraints对应位置值+1;如果是锁组的成员,对应值置为-1;如果不在里面,跳过当前虚边,继续下一循环
- 如果虚边的blocker或他的groupleader是否在等待队列中,如果在里面,根据其在等待队列中的位置将afterConstraints对应位置值置为其下标值+1
- 将该虚边的pred的值置为waiter在等待队列中的位置下标,link置为afterConstraints对应位置的值。
for (i = 0; i < nConstraints; i++)//遍历每一个边{proc = constraints[i].waiter;Assert(proc != NULL);jj = -1;for (j = queue_size; --j >= 0;)//遍历等待队列中的每个进程,与虚边的waiter对比{PGPROC *waiter = topoProcs[j];if (waiter == proc || waiter->lockGroupLeader == proc){//如果waiter等于等待队列中的某个进程或他的组长,表示是一个该等待队列相关的边Assert(waiter->waitLock == lock);if (jj == -1)//是第一个,将jj标记为该进程在等待队列中的位置jj = j;else//其他的锁组成员标记为-1{Assert(beforeConstraints[j] <= 0);beforeConstraints[j] = -1;}}}if (jj < 0)//没有相关的等待者,表名与当前锁无关continue;/*同理,判断被等待者进程*/proc = constraints[i].blocker;Assert(proc != NULL);kk = -1;for (k = queue_size; --k >= 0;)//遍历等待队列中的每个进程,与虚边的blocker对比{PGPROC *blocker = topoProcs[k];if (blocker == proc || blocker->lockGroupLeader == proc){//如果blocker等于等待队列中的某个进程或他的组长,表示是一个该等待队列相关的边Assert(blocker->waitLock == lock);if (kk == -1)//是第一个,将kk标记为该进程在等待队列中的位置kk = k;else//其他的锁组成员标记为-1{Assert(beforeConstraints[k] <= 0);beforeConstraints[k] = -1;}}}if (kk < 0)//没有匹配的边,说明与当前锁无关continue;Assert(beforeConstraints[jj] >= 0);beforeConstraints[jj]++; /* 如果waiter进程在topoProcs中,这里就+1*/constraints[i].pred = jj;//保存虚边的water在等待队列中的位置constraints[i].link = afterConstraints[kk];//指向afterConstraintsafterConstraints[kk] = i + 1;}
- 开始进行拓扑排序,遍历每个等待队列中的进程
- 从等待队列的尾部开始遍历,找到第一个非空的进程
- 如果进程非空且其在beforeConstraints对应位置的值为0时,满足排序要求,将进程存入到waiterOrder队列中,然后将topoProcs中的对应位置置为空。
- 更新beforeConstraints中的值。
last = queue_size - 1;for (i = queue_size - 1; i >= 0;)//遍历topoProcs数组,反向遍历{int c;int nmatches = 0;while (topoProcs[last] == NULL)//找到第一个非空的进程last--;for (j = last; j >= 0; j--){if (topoProcs[j] != NULL && beforeConstraints[j] == 0)//第一个不在虚边的break;}if (j < 0)//遍历完也没有不在虚边的return false;proc = topoProcs[j];if (proc->lockGroupLeader != NULL)//如果是锁组,获取其组长proc = proc->lockGroupLeader;Assert(proc != NULL);for (c = 0; c <= last; ++c){if (topoProcs[c] == proc || (topoProcs[c] != NULL &&topoProcs[c]->lockGroupLeader == proc))//当前进程或当前组长进程{ordering[i - nmatches] = topoProcs[c];//存入ordering数组即waitOrders[xx].procstopoProcs[c] = NULL;++nmatches;}}Assert(nmatches > 0);i -= nmatches;for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link)beforeConstraints[constraints[k - 1].pred]--;}
-
查找环时找到一个虚边环,虚边为D->A, 虚边保存在CurContraints
-
针对该虚边,遍历waiterOrders数组中的每个等待队列,并进行拓扑排序,这里假设有等待队列如下图
-
遍历等待队列中的每个进程并与边进行比较,将符合要求的信息存入beforeConstraints和afterConstraints,并更新curContraints的pred和link字段
-
然后对等待队列进行排序,排序后的队列为
-
这样警告拓扑排序后的等待关系就变成了,下图,环被消除了,从而解决了死锁。
死锁检测示例
下面是根据上面的例子,得到的函数运行流程
- 进程:A,B, C
- 锁Lock1,Lock2
- Lock1锁等待关系: C->A->B
- Lock2锁等待关系: B ->C
- 假设当前进程是A触发的死锁检测。
死锁检测流程如下:
* DeadLockCheck
** DeadLockCheckRecurse
*** TestConfiguration(A)
**** FindLockCycle(A,NULL,0)
***** nVisitedProcs = 0;
***** nDeadlockDetails = 0;
****** possibleConstraints = 0;
***** FindLockCycleRecurse(A,0,NULL,0)
****** visitedProcs[0] = A
****** FindLockCycleRecurseMember(A ,A ,0,NULL,0) //事务A等待的Lock1锁的等待队列
****** Lock1的等待队列是:C->A
******* 无实边冲突
******* nWaitOrders=0
******* 获取Lock1的等待队列:A<-C
******* FindLockCycleRecurse(C,1,NULL,0)
******* visitedProcs[0] = A
******* visitedProcs[0] = C
******* FindLockCycleRecurseMember(C ,C ,1,NULL,0) //事务C等待的Lock1锁的等待队列
******* Lock1的等待队列是:C->A
******** 无实边冲突
******** nWaitOrders=0
******** 获取Lock1的等待队列:A<-C
******** FindLockCycleRecurse(A,2,NULL,0)
********* visitedProcs[0] = A
********* visitedProcs[0] = C
********* 在visitedProcs出现过且为起始位置,返回死锁,ndeadlockDetails=2, return true
******** deadlockDetails[0]记录虚边死锁信息
******** possibleConstraints[0]:waiter=C , blocker=A, lock=lock1 ;nPossibleConstraints=1
******** return true
******** return true
******* return true
****** return true
***** return true
**** return 1
*** nPossibleConstraints = 1
*** curConstraints[0]=possibleConstraints[0]=waiter=C , blocker=A, lock=lock1
*** nCurConstraints++
*** DeadLockCheckRecurse(A)
**** TestConfiguration(A)
***** ExpandConstraints(curConstraints,1)
****** nWaitOrders
****** waitOrders[0]=lock=lock1, procs=waitOrderProcs + nWaitOrderProcs ,nprocs=2
****** nWaitOrderProcs += 2 =2
****** TopoSort(lock1,curConstraints,2,waitOrderProcs)
******* topoProcs[0] = A
******* topoProcs[1] = C
******* beforeConstraints[0] = 0
******* beforeConstraints[1] = 1
******* afterConstraints[0] = 1
******* afterConstraints[1] = 0
******* curConstraints[0].pred = 1
******* curConstraints[0].link = 0
******* 开始拓扑排序
******* 第一次找到进程A
******* waitOrderProcs[1] = A
******* 第二次找到进程C
******* waitOrderProcs[0] = C
******* 最终虚边顺序调整
******* return true
****** nWaitOrders++
****** return true
***** FindLockCycle(curConstraints[i].waiter=C, softEdges, &nSoftEdges=1)
****** nVisitedProcs = 0;
****** nDeadlockDetails = 0;
****** possibleConstraints = 0;
****** FindLockCycleRecurse(C,0,NULL,0)
******* visitedProcs[0] = C
******* FindLockCycleRecurseMember(C ,C ,0,NULL,0) //事务A等待的Lock1锁的等待队列
******** Lock1的等待队列是:C->A
******** 无实边冲突
******** nWaitOrders=1
******** FindLockCycleRecurse(A,1,NULL,0)
********* visitedProcs[0] = C
********* visitedProcs[1] = A
********* FindLockCycleRecurseMember(A ,A ,0,NULL,0) //事务A等待的Lock1锁的等待队列
********** Lock1的等待队列是:C->A
********** 无实边冲突
********** nWaitOrders=1
********** break;
********** return false
********* return false
******** return false
******* return false
****** return false
***** FindLockCycle(curConstraints[i].blocker=A, softEdges, &nSoftEdges=1)
***** FindLockCycle(A, softEdges, &nSoftEdges=1))
****** FindLockCycleRecurse(A,1,NULL,0)
******* visitedProcs[0] = A
******* FindLockCycleRecurseMember(A ,A ,0,NULL,0) //事务A等待的Lock1锁的等待队列
******** Lock1的等待队列是:C->A
******** 无实边冲突
******** nWaitOrders=1
******** break;
******** return false
******* return false
****** return false
***** return 0
**** return false 无死锁
*** return false
** nWaitOrders=1
** 根据排序后的waitOrders,重排Lock1的等待队列为A->C
** 唤醒可以被唤醒的进程。
** return DS_SOFT_DEADLOCK 有虚边死锁
* done
【参考】
- 《PostgreSQL数据库内核分析》
- 《Postgresql技术内幕-事务处理深度探索》
- 《PostgreSQL指南:内幕探索》
- pg14源码
相关文章:
11-pg内核之锁管理器(六)死锁检测
概念 每个事务都在等待集合中的另一事务,由于这个集合是一个有限集合,因此一旦在这个等待的链条上产生了环,就会产生死锁。自旋锁和轻量锁属于系统锁,他们目前没有死锁检测机制,只能靠内核开发人员在开发过程中谨慎的…...
Git 与标签管理
在 Git 中,标签 tag 是指向某个 commit 的指针(所以创建和删除都很快)。Git 有 commit id 了,为什么还要有 tag?commit id 是一串无规律的数字,不好记;而 tag 是我们自定义的,例如我…...
【0334】Postgres内核之 auxiliary process(辅助进程)初始化 MyPgXact
1. MyPgXact(ProcGlobal->allPgXact)间接初始化 在上一篇文章【0333】Postgres内核之 auxiliary process(辅助进程)创建 PGPROC 中, 讲解了Postgres内核完成 AuxiliaryProcess 初始化 pid、lxid、procLatch、myProcLocks、lockGroupMembers等所有成员的过程。 这些成员…...
20.1 分析pull模型在k8s中的应用,对比push模型
本节重点介绍 : push模型和pull模型监控系统对比为什么在k8s中只能用pull模型的k8s中主要组件的暴露地址说明 push模型和pull模型监控系统 对比下两种系统采用的不同采集模型,即push型采集和pull型采集。不同的模型在性能的考虑上是截然不同的。下面表格简单的说…...
Ubuntu 镜像替换为阿里云镜像:简化你的下载体验
Ubuntu,作为一款广受欢迎的Linux发行版,以其稳定性和易用性著称。但你是否曾因为下载速度慢而感到沮丧?现在,你可以通过将Ubuntu的默认下载源替换为阿里云镜像来解决这个问题。本文将指导你如何完成这一过程。 为什么选择阿里云镜…...
The Sandbox 游戏制作教程第 6 章|如何使用装备制作出色的游戏 —— 避免环境危险
欢迎回到我们的系列,我们将记录 The Sandbox Game Maker 的 “On-Equip”(装备)功能的多种用途。 如果你刚加入 The Sandbox,装备功能是 “可收集组件”(Collectable Component)中的一个多功能工具…...
JavaScript中的输出方式
1. console.log() console.log() 是开发者在调试代码时最常用的方法。它将信息打印到浏览器的控制台,使开发者能够查看变量的值、程序的执行状态以及其他有用的信息。 用途:用于调试和记录程序运行时的信息。优点:简单易用,适合…...
力扣9.25
2306. 公司命名 给你一个字符串数组 ideas 表示在公司命名过程中使用的名字列表。公司命名流程如下: 从 ideas 中选择 2 个 不同 名字,称为 ideaA 和 ideaB 。 交换 ideaA 和 ideaB 的首字母。 如果得到的两个新名字 都 不在ideas 中,那么 …...
从零开始之AI面试小程序
从零开始之AI面试小程序 文章目录 从零开始之AI面试小程序前言一、工具列表二、开发部署流程1. VMWare安装2. Centos安装3. Centos环境配置3.1. 更改子网IP3.2. 配置静态IP地址 4. Docker和Docker Compose安装5. Docker镜像加速源配置6. 部署中间件6.1. MySQL部署6.2. Redis部署…...
Html2OpenXml:HTML转化为OpenXml的.Net库,轻松实现Html转为Word。
推荐一个开源库,轻松实现HTML转化为OpenXml。 01 项目简介 Html2OpenXml 是一个开源.Net库,旨在将简单或复杂的HTML内容转换为OpenXml组件。 该项目始于2009年,最初是为了将用户评论转换为Word文档而设计的 随着时间的推移,Ht…...
HumanNeRF:Free-viewpoint Rendering of Moving People from Monocular Video 精读
1. 姿态估计和骨架变换模块 人体姿态估计:HumanNeRF 通过已知的单目视频对视频中人物的姿态进行估计。常见的方法是通过人体姿态估计器(如 OpenPose 或 SMPL 模型)提取人物的骨架信息,获取 3D 关节的位置信息。这些关节信息可以帮…...
Springboot中基于注解实现公共字段自动填充
1.使用场景 当我们有大量的表需要管理公共字段,并且希望提高开发效率和确保数据一致性时,使用这种自动填充方式是很有必要的。它可以达到一下作用 统一管理数据库表中的公共字段:如创建时间、修改时间、创建人ID、修改人ID等,这些…...
Android 已经过时的方法用什么新方法替代?
过时修正举例 (Kotlin): getColor(): resources.getColor(R.color.white) //已过时// 修正后:ContextCompat.getColor(this, R.color.white) getDrawable(): resources.getDrawable(R.mipmap.test) //已过时//修正后:ContextCompat.getDrawable(this, R.mipmap.test) //…...
【RocketMQ】MQ与RocketMQ介绍
🎯 导读:本文介绍了消息队列(MQ)的基本概念及其在分布式系统中的作用,包括实现异步通信、削峰限流和应用解耦等方面的优势,并对ActiveMQ、RabbitMQ、RocketMQ及Kafka四种MQ产品进行了对比分析,涵…...
【笔记】自动驾驶预测与决策规划_Part4_时空联合规划
文章目录 0. 前言1. 时空联合规划的基本概念1.1 时空分离方法1.2 时空联合方法 2.基于搜索的时空联合规划 (Hybrid A* )2.1 基于Hybrid A* 的时空联合规划建模2.2 构建三维时空联合地图2.3 基于Hybrid A*的时空节点扩展2.4 Hybrid A* :时空节…...
Linux指令收集
文件和目录操作 ls: 列出目录内容。 -l 显示详细信息。-a 显示隐藏文件(以.开头的文件)。cd: 改变当前工作目录。 cd ~ 返回主目录。cd .. 上移一级目录。pwd: 显示当前工作目录。mkdir: 创建目录。 mkdir -p path/to/directory 创建多级目录。rmdir: 删…...
《C++并发编程实战》笔记(五)
五、内存模型和原子操作 5.1 C中的标准原子类型 原子操作是不可分割的操作,它或者完全做好,或者完全没做。 标准原子类型的定义在头文件<atomic>中,类模板std::atomic<T>接受各种类型的模板实参,从而创建该类型对应…...
在Python中实现多目标优化问题(5)
在Python中实现多目标优化问题 在Python中实现多目标优化,除了传统的进化算法(如NSGA-II、MOEA/D)和机器学习辅助的方法之外,还有一些新的方法和技术。以下是一些较新的或较少被提及的方法: 1. 基于梯度的多目标优化…...
【Linux:共享内存】
共享内存的概念: 操作系统通过页表将共享内存的起始虚拟地址映射到当前进程的地址空间中共享内存是由需要通信的双方进程之一来创建但该资源并不属于创建它的进程,而属于操作系统 共享内存可以在系统中存在多份,供不同个数,不同进…...
今年Java回暖了吗
今年回暖了吗 仅结合师兄和同学的情况 BG 大多双非本 少部分211本 985硕 去年十月一之前 基本转正都失败 十月一之前0 offer 只有很少的人拿到美团 今年十月一之前 有HC的基本都转正了(美团、字节等),目前没有HC的说也有机会(…...
a = Sw,其中a和w是向量,S是矩阵,求w等于什么?w可以写成关于a和S的什么样子的公式
给定公式: a S w a S w aSw 其中: a a a 是已知向量, S S S 是已知矩阵, w w w 是未知向量。 我们的目标是求解 w w w,即将 w w w 表示为 a a a 和 S S S 的函数。 情况 1:矩阵 S S S 可逆 如果矩…...
多线程事务管理:Spring Boot 实现全局事务回滚
多线程事务管理:Spring Boot 实现全局事务回滚 在日常开发中,我们常常会遇到需要在多线程环境下进行数据库操作的场景。这类操作的挑战在于如何保证多个线程中的数据库操作要么一起成功,要么一起失败,即 事务的原子性。尤其是在多个线程并发执行的情况下,确保事务的一致性…...
Vue3 中集成海康 H5 监控视频播放功能
🌈个人主页:前端青山 🔥系列专栏:Vue篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来Vuet篇专栏内容:Vue-集成海康 H5 监控视频播放功能 目录 一、引言 二、环境搭建 三、代码解析 子组件部分 1.…...
Linux: eBPF: libbpf-bootstrap-master 编译
文章目录 简介编译运行展示输出展示:简介 这个是使用libbpf的一个例子; 编译 如果是一个可以联网的机器,这个libbpf-bootstrap的编译就方便了,完全是自动化的下载依赖文件;如果没有,就只能自己准备这些个软件。 需要:libbpf-static; [root@RH8-LCP c]# makeLIB …...
1.1.4 计算机网络的分类
按分布范围分类: 广域网(wan) 城域网(man) 局域网(lan) 个域网(pan) 注意:如今局域网几乎采用“以太网技术实现”,因此“以太网”几乎成了“局域…...
周家庄智慧旅游小程序
项目概述 周家庄智慧旅游小程序将通过数字化手段提升游客的旅游体验,依托周家庄的自然与文化资源,打造智慧旅游新模式。该小程序将结合虚拟现实(VR)、增强现实(AR)和人工智能等技术,提供丰富的…...
【在Linux世界中追寻伟大的One Piece】命名管道
目录 1 -> 命名管道 1.1 -> 创建一个命名管道 1.2 -> 匿名管道与命名管道的区别 1.3 -> 命名管道的打开规则 1.4 -> 例子 1 -> 命名管道 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。如果我们想在不相关的进程之间交换数据&…...
如意控物联网项目-ML307R模组软件及硬件调试环境搭建
软件及硬件调试环境搭建 1、 软件环境搭建及编译 a) 打开官方SDK,内涵APP-DEMO,通过vscode打开程序, 软件程序编写及编译参考下边说明文档链接 OneMO线上服务平台 编译需预安装python3.7以上版本,安装完python后,打开…...
大模型分布式训练并行技术(九)-总结
近年来,随着Transformer、MOE架构的提出,使得深度学习模型轻松突破上万亿规模参数,传统的单机单卡模式已经无法满足超大模型进行训练的要求。因此,我们需要基于单机多卡、甚至是多机多卡进行分布式大模型的训练。 而利用AI集群&a…...
uniapp view设置当前view之外的点击事件
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...
城市门户网站建设/高端网站建设南宁
跳槽不算频繁,但参加过不少面试(电话面试、face to face 面试),面过大 / 小公司、互联网 / 传统软件公司,面糊过(眼高手低,缺乏实战经验,挂掉),也面过人&…...
网站建设 摊销年限/重大新闻事件
一、为什么要有函数?没有函数有什么问题? 1、组织结构不清晰,可读性差 2、代码冗余 3、可扩展性差 二、函数的分类: 1、内置函数:python解释器已经为我们定义好了的函数即内置函数,我们可以拿来就用而无需事…...
wordpress 面包插件/在线培训网站次要关键词
我们知道目前很多应用系统中的内容传输协议采用的HTTP协议,因此不管你是前端人员、后端人员、运维人员,甚至是管理人员,都需要掌握HTTP知识!!HTTP发展历史 HTTP/0.9 该版本只有一个命令GET;没有HEADER等描…...
全景旅游网站建设/太原百度快速排名提升
AJAX的出现,在提升用户体验、减少网络流量、减轻服务器负载的同时,也使Web2.0应用程序的安全问题更加突出。本文在一个小型实验平台的基础上,分析了三类常见的基于AJAX的Web2.0应用程序安全问题:客户端并不安全的安全控制、更多的…...
网站后台管理产品排序/搜索引擎优化的主要工作
目录🚩 一、起因✈️ 二、经过⌛ 1、开始准备📕 2、学习过程🖥️ 3、练习过程🌻 三、总结本文非广告! 仅复盘前段时间考CSDN认证的经历,将归纳和总结分享给大家,如果对您有帮助的话,…...
画江湖网站开发文档/怎样在百度上发表文章
设计模式学习笔记(十三)——Proxy代理模式 Proxy代理模式是一种结构型设计模式,主要解决的问题是:在直接访问对象时带来的问题,比如说:要访问的对象在远程的机器上。在面向对象系统中,有些对象由…...