@@ -48,6 +48,7 @@ typedef struct DtmTransStatus
4848{
4949 TransactionId xid ;
5050 XidStatus status ;
51+ int nSubxids ;
5152 cid_t cid ;
5253 struct DtmTransStatus * next ;
5354} DtmTransStatus ;
@@ -65,6 +66,8 @@ typedef struct
6566{
6667 char gtid [MAX_GTID_SIZE ];
6768 TransactionId xid ;
69+ TransactionId * subxids ;
70+ int nSubxids ;
6871} DtmTransId ;
6972
7073
@@ -75,19 +78,19 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
7578static HTAB * xid2status ;
7679static HTAB * gtid2xid ;
7780static DtmNodeState * local ;
78- static DtmTransState dtm_tx ;
81+ static DtmCurrentTrans dtm_tx ;
7982static uint64 totalSleepInterrupts ;
8083static int DtmVacuumDelay ;
8184
8285static Snapshot DtmGetSnapshot (Snapshot snapshot );
8386static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum );
8487static bool DtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
8588static TransactionId DtmAdjustOldestXid (TransactionId xid );
86- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
8789static bool DtmDetectGlobalDeadLock (PGPROC * proc );
8890static cid_t DtmGetCsn (TransactionId xid );
91+ static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids );
8992
90- static TransactionManager DtmTM = { PgTransactionIdGetStatus , DtmSetTransactionStatus , DtmGetSnapshot , PgGetNewTransactionId , DtmGetOldestXmin , PgTransactionIdIsInProgress , PgGetGlobalTransactionId , DtmXidInMVCCSnapshot , DtmDetectGlobalDeadLock };
93+ static TransactionManager DtmTM = { PgTransactionIdGetStatus , PgTransactionIdSetTreeStatus , DtmGetSnapshot , PgGetNewTransactionId , DtmGetOldestXmin , PgTransactionIdIsInProgress , PgGetGlobalTransactionId , DtmXidInMVCCSnapshot , DtmDetectGlobalDeadLock };
9194
9295void _PG_init (void );
9396void _PG_fini (void );
@@ -290,6 +293,7 @@ dtm_xact_callback(XactEvent event, void *arg)
290293 break ;
291294
292295 case XACT_EVENT_PREPARE :
296+ DtmLocalSavePreparedState (dtm_get_global_trans_id ());
293297 DtmLocalEnd (& dtm_tx );
294298 break ;
295299
@@ -462,6 +466,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
462466 static timestamp_t firstReportTime ;
463467 static timestamp_t prevReportTime ;
464468 static timestamp_t totalSleepTime ;
469+ static timestamp_t maxSleepTime ;
465470#endif
466471 timestamp_t delay = MIN_WAIT_TIMEOUT ;
467472 Assert (xid != InvalidTransactionId );
@@ -476,13 +481,6 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
476481 while (true)
477482 {
478483 DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & xid , HASH_FIND , NULL );
479- if (ts == NULL &&
480- !(TransactionIdFollowsOrEquals (xid , snapshot -> xmax ) || TransactionIdPrecedes (xid , snapshot -> xmin )))
481- {
482- //TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
483- TransactionId subxid = SubTransGetTopmostTransaction (xid );
484- ts = (DtmTransStatus * )hash_search (xid2status , & subxid , HASH_FIND , NULL );
485- }
486484 if (ts != NULL )
487485 {
488486 if (ts -> cid > dtm_tx .snapshot ) {
@@ -497,17 +495,21 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
497495 SpinLockRelease (& local -> lock );
498496#if TRACE_SLEEP_TIME
499497 {
500- timestamp_t now = dtm_get_current_time ();
498+ timestamp_t delta , now = dtm_get_current_time ();
501499#endif
502500 dtm_sleep (delay );
503501#if TRACE_SLEEP_TIME
504- totalSleepTime += dtm_get_current_time () - now ;
505- if (now > prevReportTime + USEC * 1 ) {
502+ delta = dtm_get_current_time () - now ;
503+ totalSleepTime += delta ;
504+ if (delta > maxSleepTime ) {
505+ maxSleepTime = delta ;
506+ }
507+ if (now > prevReportTime + USEC * 10 ) {
506508 prevReportTime = now ;
507509 if (firstReportTime == 0 ) {
508510 firstReportTime = now ;
509511 } else {
510- fprintf (stderr , "Snapshot sleep %lu of %lu usec (%f%%)\n" , totalSleepTime , now - firstReportTime , totalSleepTime * 100.0 /(now - firstReportTime ));
512+ fprintf (stderr , "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu \n" , totalSleepTime , now - firstReportTime , totalSleepTime * 100.0 /(now - firstReportTime ), maxSleepTime );
511513 }
512514 }
513515 }
@@ -577,7 +579,7 @@ void DtmInitialize()
577579}
578580
579581
580- void DtmLocalBegin (DtmTransState * x )
582+ void DtmLocalBegin (DtmCurrentTrans * x )
581583{
582584 if (x -> xid == InvalidTransactionId ) {
583585 SpinLockAcquire (& local -> lock );
@@ -592,30 +594,32 @@ void DtmLocalBegin(DtmTransState* x)
592594 }
593595}
594596
595- cid_t DtmLocalExtend (DtmTransState * x , GlobalTransactionId gtid )
597+ cid_t DtmLocalExtend (DtmCurrentTrans * x , GlobalTransactionId gtid )
596598{
597599 if (gtid != NULL ) {
598600 SpinLockAcquire (& local -> lock );
599601 {
600602 DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_ENTER , NULL );
601- x -> is_global = true;
602603 id -> xid = x -> xid ;
604+ id -> nSubxids = 0 ;
605+ id -> subxids = 0 ;
603606 }
604607 SpinLockRelease (& local -> lock );
605- } else {
606- x -> is_global = true;
607- }
608+ }
609+ x -> is_global = true;
608610 return x -> snapshot ;
609611}
610612
611- cid_t DtmLocalAccess (DtmTransState * x , GlobalTransactionId gtid , cid_t global_cid )
613+ cid_t DtmLocalAccess (DtmCurrentTrans * x , GlobalTransactionId gtid , cid_t global_cid )
612614{
613615 cid_t local_cid ;
614616 SpinLockAcquire (& local -> lock );
615617 {
616618 if (gtid != NULL ) {
617619 DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_ENTER , NULL );
618620 id -> xid = x -> xid ;
621+ id -> nSubxids = 0 ;
622+ id -> subxids = 0 ;
619623 }
620624 local_cid = dtm_sync (global_cid );
621625 x -> snapshot = local_cid ;
@@ -638,7 +642,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid)
638642 ts = (DtmTransStatus * )hash_search (xid2status , & id -> xid , HASH_ENTER , NULL );
639643 ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
640644 ts -> cid = dtm_get_cid ();
645+ ts -> nSubxids = id -> nSubxids ;
641646 DtmTransactionListAppend (ts );
647+ DtmAddSubtransactions (ts , id -> subxids , id -> nSubxids );
642648 }
643649 SpinLockRelease (& local -> lock );
644650}
@@ -661,22 +667,26 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
661667 {
662668 DtmTransStatus * ts ;
663669 DtmTransId * id ;
670+ int i ;
664671
665672 id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
666673 Assert (id != NULL );
667674
668675 ts = (DtmTransStatus * )hash_search (xid2status , & id -> xid , HASH_FIND , NULL );
669676 Assert (ts != NULL );
670677 ts -> cid = cid ;
671-
678+ for (i = 0 ; i < ts -> nSubxids ; i ++ ) {
679+ ts = ts -> next ;
680+ ts -> cid = cid ;
681+ }
672682 dtm_sync (cid );
673683
674684 DTM_TRACE ((stderr , "Prepare transaction %u(%s) with CSN %lu\n" , id -> xid , gtid , cid ));
675685 }
676686 SpinLockRelease (& local -> lock );
677687}
678688
679- void DtmLocalCommitPrepared (DtmTransState * x , GlobalTransactionId gtid )
689+ void DtmLocalCommitPrepared (DtmCurrentTrans * x , GlobalTransactionId gtid )
680690{
681691 Assert (gtid != NULL );
682692
@@ -688,33 +698,45 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
688698 x -> is_global = true;
689699 x -> is_prepared = true;
690700 x -> xid = id -> xid ;
701+ free (id -> subxids );
702+
691703 DTM_TRACE ((stderr , "Global transaction %u(%s) is precommitted\n" , x -> xid , gtid ));
692704 }
693705 SpinLockRelease (& local -> lock );
694706}
695707
696- void DtmLocalCommit (DtmTransState * x )
708+ void DtmLocalCommit (DtmCurrentTrans * x )
697709{
698710 SpinLockAcquire (& local -> lock );
699711 {
700712 bool found ;
701713 DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & x -> xid , HASH_ENTER , & found );
714+ ts -> status = TRANSACTION_STATUS_COMMITTED ;
702715 if (x -> is_prepared ) {
716+ int i ;
717+ DtmTransStatus * sts = ts ;
703718 Assert (found );
704719 Assert (x -> is_global );
705- } else if (!found ) {
706- //Assert(!found);
720+ for (i = 0 ; i < ts -> nSubxids ; i ++ ) {
721+ sts = sts -> next ;
722+ Assert (sts -> cid == ts -> cid );
723+ sts -> status = TRANSACTION_STATUS_COMMITTED ;
724+ }
725+ } else {
726+ TransactionId * subxids ;
727+ Assert (!found );
707728 ts -> cid = dtm_get_cid ();
708729 DtmTransactionListAppend (ts );
730+ ts -> nSubxids = xactGetCommittedChildren (& subxids );
731+ DtmAddSubtransactions (ts , subxids , ts -> nSubxids );
709732 }
710733 x -> cid = ts -> cid ;
711- ts -> status = TRANSACTION_STATUS_COMMITTED ;
712734 DTM_TRACE ((stderr , "Local transaction %u is committed at %lu\n" , x -> xid , x -> cid ));
713735 }
714736 SpinLockRelease (& local -> lock );
715737}
716738
717- void DtmLocalAbortPrepared (DtmTransState * x , GlobalTransactionId gtid )
739+ void DtmLocalAbortPrepared (DtmCurrentTrans * x , GlobalTransactionId gtid )
718740{
719741 Assert (gtid != NULL );
720742
@@ -726,13 +748,14 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
726748 x -> is_global = true;
727749 x -> is_prepared = true;
728750 x -> xid = id -> xid ;
751+ free (id -> subxids );
729752
730753 DTM_TRACE ((stderr , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
731754 }
732755 SpinLockRelease (& local -> lock );
733756}
734757
735- void DtmLocalAbort (DtmTransState * x )
758+ void DtmLocalAbort (DtmCurrentTrans * x )
736759{
737760 SpinLockAcquire (& local -> lock );
738761 {
@@ -741,9 +764,10 @@ void DtmLocalAbort(DtmTransState* x)
741764 if (x -> is_prepared ) {
742765 Assert (found );
743766 Assert (x -> is_global );
744- } else if (! found ) {
745- // Assert(!found);
767+ } else {
768+ Assert (!found );
746769 ts -> cid = dtm_get_cid ();
770+ ts -> nSubxids = 0 ;
747771 DtmTransactionListAppend (ts );
748772 }
749773 x -> cid = ts -> cid ;
@@ -753,39 +777,14 @@ void DtmLocalAbort(DtmTransState* x)
753777 SpinLockRelease (& local -> lock );
754778}
755779
756- void DtmLocalEnd (DtmTransState * x )
780+ void DtmLocalEnd (DtmCurrentTrans * x )
757781{
758782 x -> is_global = false;
759783 x -> is_prepared = false;
760784 x -> xid = InvalidTransactionId ;
761785 x -> cid = INVALID_CID ;
762786}
763787
764- void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
765- {
766- if (nsubxids != 0 ) {
767- SpinLockAcquire (& local -> lock );
768- {
769- int i ;
770- bool found ;
771- DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & xid , HASH_ENTER , & found );
772- if (!found ) {
773- ts -> cid = dtm_get_cid ();
774- }
775- ts -> status = status ;
776- for (i = 0 ; i < nsubxids ; i ++ ) {
777- DtmTransStatus * sts = (DtmTransStatus * )hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
778- Assert (!found );
779- sts -> status = status ;
780- sts -> cid = ts -> cid ;
781- DtmTransactionListInsertAfter (ts , sts );
782- }
783- }
784- SpinLockRelease (& local -> lock );
785- }
786- PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
787- }
788-
789788bool DtmDetectGlobalDeadLock (PGPROC * proc )
790789{
791790 elog (WARNING , "Global deadlock?" );
@@ -806,3 +805,36 @@ static cid_t DtmGetCsn(TransactionId xid)
806805 return csn ;
807806}
808807
808+ void DtmLocalSavePreparedState (GlobalTransactionId gtid )
809+ {
810+ if (gtid != NULL ) {
811+ SpinLockAcquire (& local -> lock );
812+ {
813+ DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
814+ if (id != NULL ) {
815+ TransactionId * subxids ;
816+ int nSubxids = xactGetCommittedChildren (& subxids );
817+ if (nSubxids != 0 ) {
818+ id -> subxids = (TransactionId * )malloc (nSubxids * sizeof (TransactionId ));
819+ id -> nSubxids = nSubxids ;
820+ memcpy (id -> subxids , subxids , nSubxids * sizeof (TransactionId ));
821+ }
822+ }
823+ }
824+ SpinLockRelease (& local -> lock );
825+ }
826+ }
827+
828+ static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids )
829+ {
830+ int i ;
831+ for (i = 0 ; i < nSubxids ; i ++ ) {
832+ bool found ;
833+ DtmTransStatus * sts = (DtmTransStatus * )hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
834+ Assert (!found );
835+ sts -> status = ts -> status ;
836+ sts -> cid = ts -> cid ;
837+ sts -> nSubxids = 0 ;
838+ DtmTransactionListInsertAfter (ts , sts );
839+ }
840+ }
0 commit comments