static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
const LOCKTAG *locktag, uint32 hashcode);
static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
+static bool GroupLockShouldJumpQueue(LockMethod lockMethodTable,
+ LOCKMODE lockmode,
+ LOCK *lock,
+ PROCLOCK *proclock);
/*
* To make the fast-path lock mechanism work, we must have some way of
/*
* If lock requested conflicts with locks requested by waiters, must join
- * wait queue. Otherwise, check for conflict with already-held locks.
- * (That's last because most complex check.)
+ * wait queue (except for certain cases involving group locking, where
+ * new lockers must sometimes jump the entire wait queue to avoid
+ * deadlock). Otherwise, we can grant ourselves the lock if there are
+ * no conflicts.
*/
if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
- status = STATUS_FOUND;
+ {
+ if (proclock->groupLeader != NULL &&
+ GroupLockShouldJumpQueue(lockMethodTable, lockmode, lock,
+ proclock))
+ status = STATUS_OK;
+ else
+ status = STATUS_FOUND;
+ }
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock);
if (status == STATUS_OK)
{
- /* No conflict with held or previously requested locks */
+ /* We can and should grant ourselves the lock at once */
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
}
SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
while (otherproclock != NULL)
{
- if (proclock->groupLeader == otherproclock->groupLeader &&
+ if (proclock != otherproclock &&
+ proclock->groupLeader == otherproclock->groupLeader &&
(otherproclock->holdMask & conflictMask) != 0)
{
int intersectMask = otherproclock->holdMask & conflictMask;
return STATUS_FOUND;
}
+/*
+ * GroupLockGrantWithoutWait -- should a group lock be granted without
+ * waiting, despite the presence of conflicting waiters?
+ *
+ * If some member of our locking group already holds a lock on the object,
+ * then we should skip the wait queue and grant ourselves the lock immediately.
+ * This is because we presume lock group members will eventually wait for
+ * each other; thus, if we didn't do this, such situations would result in
+ * an eventual deadlock. However, if a conflicting lock is present that is
+ * not held by another member of our lock group, then we can't do this.
+ * In that case we'll have to wait despite the deadlock risk and hope for
+ * the best.
+ */
+static bool
+GroupLockShouldJumpQueue(LockMethod lockMethodTable,
+ LOCKMODE lockmode,
+ LOCK *lock,
+ PROCLOCK *proclock)
+{
+ int numLockModes = lockMethodTable->numLockModes;
+ LOCKMASK myLocks;
+ int conflictMask = lockMethodTable->conflictTab[lockmode];
+ int conflictsRemaining[MAX_LOCKMODES];
+ int totalConflictsRemaining = 0;
+ int i;
+ SHM_QUEUE *procLocks;
+ PROCLOCK *otherproclock;
+
+ /*
+ * If we're the only member of the lock group, then clearly no other
+ * member holds a lock. We should NOT jump the queue.
+ */
+ if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2)
+ {
+ Assert(proclock->tag.myProc == MyProc);
+ Assert(MyProc->lockGroupMembers == 1);
+ PROCLOCK_PRINT("GroupLockShouldJumpQueue: trivial group", proclock);
+ return false;
+ }
+
+ /* Count the number of lock conflicts, excluding my own locks. */
+ myLocks = proclock->holdMask;
+ for (i = 1; i <= numLockModes; i++)
+ {
+ if ((conflictMask & LOCKBIT_ON(i)) == 0)
+ {
+ conflictsRemaining[i] = 0;
+ continue;
+ }
+ conflictsRemaining[i] = lock->granted[i];
+ if (myLocks & LOCKBIT_ON(i))
+ --conflictsRemaining[i];
+ totalConflictsRemaining += conflictsRemaining[i];
+ }
+
+ /*
+ * Search for locks held by other group members. Even if there are
+ * no conflicts, we can't exit early yet, because we don't know whether
+ * any group member actually holds a lock.
+ */
+ procLocks = &(lock->procLocks);
+ otherproclock = (PROCLOCK *)
+ SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
+ while (otherproclock != NULL)
+ {
+ if (proclock != otherproclock &&
+ proclock->groupLeader == otherproclock->groupLeader &&
+ otherproclock->holdMask != 0)
+ {
+ int intersectMask = otherproclock->holdMask & conflictMask;
+
+ /*
+ * Does the group member hold a lock in 1 or more conflicting
+ * modes? If so, reduce the count of remaining conflicts by the
+ * number of such modes.
+ */
+ if (intersectMask != 0)
+ {
+ for (i = 1; i <= numLockModes; i++)
+ {
+ if ((intersectMask & LOCKBIT_ON(i)) != 0)
+ {
+ if (conflictsRemaining[i] <= 0)
+ elog(PANIC, "proclocks held do not match lock");
+ conflictsRemaining[i]--;
+ totalConflictsRemaining--;
+ }
+ }
+ }
+
+ /*
+ * Whether there were any conflicting modes here or not, the fact
+ * that the lock is held at all makes us eligible to jump the
+ * queue. But we can only do that once the absence of conflicts
+ * is established.
+ */
+ if (totalConflictsRemaining == 0)
+ {
+ PROCLOCK_PRINT("GroupLockShouldJumpQueue: jump", proclock);
+ return true;
+ }
+ }
+ otherproclock = (PROCLOCK *)
+ SHMQueueNext(procLocks, &proclock->lockLink,
+ offsetof(PROCLOCK, lockLink));
+ }
+
+ /* Either no group members hold locks, or there are conflicts. */
+ PROCLOCK_PRINT("GroupLockShouldJumpQueue: fallthrough", proclock);
+ return false;
+}
+
/*
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.