@@ -71,7 +71,7 @@ void _PG_fini(void);
7171static Snapshot DtmGetSnapshot (Snapshot snapshot );
7272static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
7373static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
74- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
74+ static bool DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
7575static void DtmUpdateRecentXmin (Snapshot snapshot );
7676static void DtmInitialize (void );
7777static void DtmXactCallback (XactEvent event , void * arg );
@@ -627,8 +627,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
627627 return status ;
628628}
629629
630- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
630+ static bool DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
631631{
632+ bool acknowledged = true;
632633 XTM_INFO ("%d: DtmSetTransactionStatus %u = %u\n" , getpid (), xid , status );
633634 if (!RecoveryInProgress ())
634635 {
@@ -640,7 +641,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
640641 PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
641642 DtmGlobalSetTransStatus (xid , status , false);
642643 XTM_INFO ("Abort transaction %d\n" , xid );
643- return ;
644+ return true ;
644645 }
645646 else
646647 {
@@ -649,8 +650,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
649650 LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
650651 hash_search (xid_in_doubt , & DtmNextXid , HASH_ENTER , NULL );
651652 LWLockRelease (dtm -> hashLock );
652- DtmGlobalSetTransStatus (xid , status , true);
653- XTM_INFO ("Commit transaction %d\n" , xid );
653+ if (!DtmGlobalSetTransStatus (xid , status , true)) {
654+ acknowledged = false;
655+ XTM_INFO ("Commit of transaction %d in rejected by DTM\n" , xid );
656+ status = TRANSACTION_STATUS_ABORTED ;
657+ } else {
658+ XTM_INFO ("Commit transaction %d\n" , xid );
659+ }
654660 }
655661 }
656662 else
@@ -661,11 +667,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
661667 else
662668 {
663669 XidStatus gs ;
664- gs = DtmGlobalGetTransStatus (xid , false);
665- if (gs != TRANSACTION_STATUS_UNKNOWN )
670+ gs = DtmGlobalGetTransStatus (xid , false);
671+ if (gs != TRANSACTION_STATUS_UNKNOWN ) {
672+ acknowledged = status == gs ;
666673 status = gs ;
674+ }
667675 }
668- PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
676+ return PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn ) && acknowledged ;
669677}
670678
671679static uint32 dtm_xid_hash_fn (const void * key , Size keysize )
@@ -991,27 +999,35 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
991999{
9921000 ByteBuffer * buf = (ByteBuffer * )arg ;
9931001 LOCK * lock = proclock -> tag .myLock ;
994- PGPROC * proc = proclock -> tag .myProc ;
1002+ PGPROC * proc = proclock -> tag .myProc ;
1003+
9951004 if (lock != NULL ) {
996- if (proc -> waitLock == lock ) {
1005+ PGXACT * srcPgXact = & ProcGlobal -> allPgXact [proc -> pgprocno ];
1006+
1007+ if (TransactionIdIsValid (srcPgXact -> xid ) && proc -> waitLock == lock ) {
9971008 LockMethod lockMethodTable = GetLocksMethodTable (lock );
9981009 int numLockModes = lockMethodTable -> numLockModes ;
9991010 int conflictMask = lockMethodTable -> conflictTab [proc -> waitLockMode ];
10001011 SHM_QUEUE * procLocks = & (lock -> procLocks );
10011012 int lm ;
10021013
1003- ByteBufferAppendInt32 (buf , proc -> lxid ); /* waiting transaction */
1014+ ByteBufferAppendInt32 (buf , srcPgXact -> xid ); /* waiting transaction */
10041015 proclock = (PROCLOCK * ) SHMQueueNext (procLocks , procLocks ,
10051016 offsetof(PROCLOCK , lockLink ));
10061017 while (proclock )
10071018 {
10081019 if (proc != proclock -> tag .myProc ) {
1009- for (lm = 1 ; lm <= numLockModes ; lm ++ )
1010- {
1011- if ((proclock -> holdMask & LOCKBIT_ON (lm )) && (conflictMask & LOCKBIT_ON (lm )))
1020+ PGXACT * dstPgXact = & ProcGlobal -> allPgXact [proclock -> tag .myProc -> pgprocno ];
1021+ if (TransactionIdIsValid (dstPgXact -> xid )) {
1022+ Assert (srcPgXact -> xid != dstPgXact -> xid );
1023+ for (lm = 1 ; lm <= numLockModes ; lm ++ )
10121024 {
1013- ByteBufferAppendInt32 (buf , proclock -> tag .myProc -> lxid ); /* transaction holding lock */
1014- break ;
1025+ if ((proclock -> holdMask & LOCKBIT_ON (lm )) && (conflictMask & LOCKBIT_ON (lm )))
1026+ {
1027+ XTM_INFO ("%d: %u(%u) waits for %u(%u)\n" , getpid (), srcPgXact -> xid , proc -> pid , dstPgXact -> xid , proclock -> tag .myProc -> pid );
1028+ ByteBufferAppendInt32 (buf , dstPgXact -> xid ); /* transaction holding lock */
1029+ break ;
1030+ }
10151031 }
10161032 }
10171033 }
@@ -1025,12 +1041,19 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
10251041
10261042bool DtmDetectGlobalDeadLock (PGPROC * proc )
10271043{
1028- bool hasDeadlock ;
1044+ bool hasDeadlock = false ;
10291045 ByteBuffer buf ;
1030- ByteBufferAlloc (& buf );
1031- EnumerateLocks (DtmSerializeLock , & buf );
1032- hasDeadlock = DtmGlobalDetectDeadLock (PostPortNumber , proc -> lxid , buf .data , buf .used );
1033- ByteBufferFree (& buf );
1034- elog (NOTICE , "Deadlock detected for transaction %u" , proc -> lxid );
1046+ PGXACT * pgxact = & ProcGlobal -> allPgXact [proc -> pgprocno ];
1047+
1048+ if (TransactionIdIsValid (pgxact -> xid )) {
1049+ ByteBufferAlloc (& buf );
1050+ XTM_INFO ("%d: wait graph begin\n" , getpid ());
1051+ EnumerateLocks (DtmSerializeLock , & buf );
1052+ XTM_INFO ("%d: wait graph end\n" , getpid ());
1053+ hasDeadlock = DtmGlobalDetectDeadLock (PostPortNumber , pgxact -> xid , buf .data , buf .used );
1054+ ByteBufferFree (& buf );
1055+ XTM_INFO ("%d: deadlock detected for %u\n" , getpid (), pgxact -> xid );
1056+ elog (WARNING , "Deadlock detected for transaction %u" , pgxact -> xid );
1057+ }
10351058 return hasDeadlock ;
10361059}
0 commit comments