@@ -167,6 +167,9 @@ static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
167167static TransactionId KnownAssignedXidsGetOldestXmin (void );
168168static void KnownAssignedXidsDisplay (int trace_level );
169169static void KnownAssignedXidsReset (void );
170+ static inline void ProcArrayEndTransactionInternal (PGPROC * proc ,
171+ PGXACT * pgxact , TransactionId latestXid );
172+ static void ProcArrayGroupClearXid (PGPROC * proc , TransactionId latestXid );
170173
171174/*
172175 * Report shared-memory space needed by CreateSharedProcArray.
@@ -399,26 +402,18 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
399402 */
400403 Assert (TransactionIdIsValid (allPgXact [proc -> pgprocno ].xid ));
401404
402- LWLockAcquire (ProcArrayLock , LW_EXCLUSIVE );
403-
404- pgxact -> xid = InvalidTransactionId ;
405- proc -> lxid = InvalidLocalTransactionId ;
406- pgxact -> xmin = InvalidTransactionId ;
407- /* must be cleared with xid/xmin: */
408- pgxact -> vacuumFlags &= ~PROC_VACUUM_STATE_MASK ;
409- pgxact -> delayChkpt = false; /* be sure this is cleared in abort */
410- proc -> recoveryConflictPending = false;
411-
412- /* Clear the subtransaction-XID cache too while holding the lock */
413- pgxact -> nxids = 0 ;
414- pgxact -> overflowed = false;
415-
416- /* Also advance global latestCompletedXid while holding the lock */
417- if (TransactionIdPrecedes (ShmemVariableCache -> latestCompletedXid ,
418- latestXid ))
419- ShmemVariableCache -> latestCompletedXid = latestXid ;
420-
421- LWLockRelease (ProcArrayLock );
405+ /*
406+ * If we can immediately acquire ProcArrayLock, we clear our own XID
407+ * and release the lock. If not, use group XID clearing to improve
408+ * efficiency.
409+ */
410+ if (LWLockConditionalAcquire (ProcArrayLock , LW_EXCLUSIVE ))
411+ {
412+ ProcArrayEndTransactionInternal (proc , pgxact , latestXid );
413+ LWLockRelease (ProcArrayLock );
414+ }
415+ else
416+ ProcArrayGroupClearXid (proc , latestXid );
422417 }
423418 else
424419 {
@@ -441,6 +436,137 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
441436 }
442437}
443438
439+ /*
440+ * Mark a write transaction as no longer running.
441+ *
442+ * We don't do any locking here; caller must handle that.
443+ */
444+ static inline void
445+ ProcArrayEndTransactionInternal (PGPROC * proc , PGXACT * pgxact ,
446+ TransactionId latestXid )
447+ {
448+ pgxact -> xid = InvalidTransactionId ;
449+ proc -> lxid = InvalidLocalTransactionId ;
450+ pgxact -> xmin = InvalidTransactionId ;
451+ /* must be cleared with xid/xmin: */
452+ pgxact -> vacuumFlags &= ~PROC_VACUUM_STATE_MASK ;
453+ pgxact -> delayChkpt = false; /* be sure this is cleared in abort */
454+ proc -> recoveryConflictPending = false;
455+
456+ /* Clear the subtransaction-XID cache too while holding the lock */
457+ pgxact -> nxids = 0 ;
458+ pgxact -> overflowed = false;
459+
460+ /* Also advance global latestCompletedXid while holding the lock */
461+ if (TransactionIdPrecedes (ShmemVariableCache -> latestCompletedXid ,
462+ latestXid ))
463+ ShmemVariableCache -> latestCompletedXid = latestXid ;
464+ }
465+
466+ /*
467+ * ProcArrayGroupClearXid -- group XID clearing
468+ *
469+ * When we cannot immediately acquire ProcArrayLock in exclusive mode at
470+ * commit time, add ourselves to a list of processes that need their XIDs
471+ * cleared. The first process to add itself to the list will acquire
472+ * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
473+ * on behalf of all group members. This avoids a great deal of context
474+ * switching when many processes are trying to commit at once, since the lock
475+ * only needs to be handed from the last share-locker to one process waiting
476+ * for the exclusive lock, rather than to each one in turn.
477+ */
478+ static void
479+ ProcArrayGroupClearXid (PGPROC * proc , TransactionId latestXid )
480+ {
481+ volatile PROC_HDR * procglobal = ProcGlobal ;
482+ uint32 nextidx ;
483+ uint32 wakeidx ;
484+ int extraWaits = -1 ;
485+
486+ /* We should definitely have an XID to clear. */
487+ Assert (TransactionIdIsValid (allPgXact [proc -> pgprocno ].xid ));
488+
489+ /* Add ourselves to the list of processes needing a group XID clear. */
490+ proc -> backendLatestXid = latestXid ;
491+ while (true)
492+ {
493+ nextidx = pg_atomic_read_u32 (& procglobal -> nextClearXidElem );
494+ pg_atomic_write_u32 (& proc -> nextClearXidElem , nextidx );
495+
496+ if (pg_atomic_compare_exchange_u32 (& procglobal -> nextClearXidElem ,
497+ & nextidx ,
498+ (uint32 ) proc -> pgprocno ))
499+ break ;
500+ }
501+
502+ /* If the list was not empty, the leader will clear our XID. */
503+ if (nextidx != INVALID_PGPROCNO )
504+ {
505+ /* Sleep until the leader clears our XID. */
506+ while (pg_atomic_read_u32 (& proc -> nextClearXidElem ) != INVALID_PGPROCNO )
507+ {
508+ extraWaits ++ ;
509+ PGSemaphoreLock (& proc -> sem );
510+ }
511+
512+ /* Fix semaphore count for any absorbed wakeups */
513+ while (extraWaits -- > 0 )
514+ PGSemaphoreUnlock (& proc -> sem );
515+ return ;
516+ }
517+
518+ /* We are the leader. Acquire the lock on behalf of everyone. */
519+ LWLockAcquire (ProcArrayLock , LW_EXCLUSIVE );
520+
521+ /*
522+ * Now that we've got the lock, clear the list of processes waiting for
523+ * group XID clearing, saving a pointer to the head of the list.
524+ */
525+ while (true)
526+ {
527+ nextidx = pg_atomic_read_u32 (& procglobal -> nextClearXidElem );
528+ if (pg_atomic_compare_exchange_u32 (& procglobal -> nextClearXidElem ,
529+ & nextidx ,
530+ INVALID_PGPROCNO ))
531+ break ;
532+ }
533+
534+ /* Remember head of list so we can perform wakeups after dropping lock. */
535+ wakeidx = nextidx ;
536+
537+ /* Walk the list and clear all XIDs. */
538+ while (nextidx != INVALID_PGPROCNO )
539+ {
540+ PGPROC * proc = & allProcs [nextidx ];
541+ PGXACT * pgxact = & allPgXact [nextidx ];
542+
543+ ProcArrayEndTransactionInternal (proc , pgxact , proc -> backendLatestXid );
544+
545+ /* Move to next proc in list. */
546+ nextidx = pg_atomic_read_u32 (& proc -> nextClearXidElem );
547+ }
548+
549+ /* We're done with the lock now. */
550+ LWLockRelease (ProcArrayLock );
551+
552+ /*
553+ * Now that we've released the lock, go back and wake everybody up. We
554+ * don't do this under the lock so as to keep lock hold times to a
555+ * minimum. The system calls we need to perform to wake other processes
556+ * up are probably much slower than the simple memory writes we did while
557+ * holding the lock.
558+ */
559+ while (wakeidx != INVALID_PGPROCNO )
560+ {
561+ PGPROC * proc = & allProcs [wakeidx ];
562+
563+ wakeidx = pg_atomic_read_u32 (& proc -> nextClearXidElem );
564+ pg_atomic_write_u32 (& proc -> nextClearXidElem , INVALID_PGPROCNO );
565+
566+ if (proc != MyProc )
567+ PGSemaphoreUnlock (& proc -> sem );
568+ }
569+ }
444570
445571/*
446572 * ProcArrayClearTransaction -- clear the transaction fields
0 commit comments