@@ -89,7 +89,8 @@ static HTAB *gtid2xid;
8989static DtmNodeState * local ;
9090static uint64 totalSleepInterrupts ;
9191static int DtmVacuumDelay = 10 ; /* sec */
92- static bool DtmRecordCommits = 0 ;
92+ static bool finishing_prepared ;
93+
9394
9495DtmCurrentTrans dtm_tx ; // XXXX: make static
9596
@@ -103,6 +104,7 @@ static size_t DtmGetTransactionStateSize(void);
103104static void DtmSerializeTransactionState (void * ctx );
104105static void DtmDeserializeTransactionState (void * ctx );
105106
107+ static void DtmLocalFinish (bool is_commit );
106108
107109static TransactionManager DtmTM = {
108110 PgTransactionIdGetStatus ,
@@ -207,7 +209,8 @@ GlobalSnapshotShmemSize(void)
207209 Size size ;
208210
209211 size = MAXALIGN (sizeof (DtmNodeState ));
210- size = add_size (size , (sizeof (DtmTransId ) + sizeof (DtmTransStatus ) + HASH_PER_ELEM_OVERHEAD * 2 ) * DTM_HASH_INIT_SIZE );
212+ size = add_size (size , DTM_HASH_INIT_SIZE *
213+ (sizeof (DtmTransId ) + sizeof (DtmTransStatus ) + HASH_PER_ELEM_OVERHEAD * 2 ));
211214
212215 return size ;
213216}
@@ -222,22 +225,28 @@ dtm_xact_callback(XactEvent event, void *arg)
222225 DtmLocalBegin (& dtm_tx );
223226 break ;
224227
225- case XACT_EVENT_ABORT :
226- DtmLocalAbort (& dtm_tx );
227- DtmLocalEnd (& dtm_tx );
228+ case XACT_EVENT_ABORT_PREPARED :
229+ // DtmLocalAbortPrepared(&dtm_tx);
230+ finishing_prepared = true;
231+ DtmAdjustOldestXid ();
228232 break ;
229233
230- case XACT_EVENT_COMMIT :
231- DtmLocalCommit (& dtm_tx );
232- DtmLocalEnd (& dtm_tx );
234+ case XACT_EVENT_COMMIT_PREPARED :
235+ // DtmLocalCommitPrepared(&dtm_tx);
236+ finishing_prepared = true;
237+ DtmAdjustOldestXid ();
233238 break ;
234239
235- case XACT_EVENT_ABORT_PREPARED :
236- DtmLocalAbortPrepared (& dtm_tx );
240+ case XACT_EVENT_COMMIT :
241+ DtmLocalFinish (true);
242+ DtmLocalEnd (& dtm_tx );
243+ finishing_prepared = false;
237244 break ;
238245
239- case XACT_EVENT_COMMIT_PREPARED :
240- DtmLocalCommitPrepared (& dtm_tx );
246+ case XACT_EVENT_ABORT :
247+ DtmLocalFinish (false);
248+ DtmLocalEnd (& dtm_tx );
249+ finishing_prepared = false;
241250 break ;
242251
243252 case XACT_EVENT_PRE_PREPARE :
@@ -254,43 +263,6 @@ dtm_xact_callback(XactEvent event, void *arg)
254263 * ***************************************************************************
255264 */
256265
257- static uint32
258- dtm_xid_hash_fn (const void * key , Size keysize )
259- {
260- return (uint32 ) * (TransactionId * ) key ;
261- }
262-
263- static int
264- dtm_xid_match_fn (const void * key1 , const void * key2 , Size keysize )
265- {
266- return * (TransactionId * ) key1 - * (TransactionId * ) key2 ;
267- }
268-
269- static uint32
270- dtm_gtid_hash_fn (const void * key , Size keysize )
271- {
272- GlobalTransactionId id = (GlobalTransactionId ) key ;
273- uint32 h = 0 ;
274-
275- while (* id != 0 )
276- {
277- h = h * 31 + * id ++ ;
278- }
279- return h ;
280- }
281-
282- static void *
283- dtm_gtid_keycopy_fn (void * dest , const void * src , Size keysize )
284- {
285- return strcpy ((char * ) dest , (GlobalTransactionId ) src );
286- }
287-
288- static int
289- dtm_gtid_match_fn (const void * key1 , const void * key2 , Size keysize )
290- {
291- return strcmp ((GlobalTransactionId ) key1 , (GlobalTransactionId ) key2 );
292- }
293-
294266static char const *
295267DtmGetName (void )
296268{
@@ -480,17 +452,11 @@ DtmInitialize()
480452void
481453DtmLocalBegin (DtmCurrentTrans * x )
482454{
483- if (!TransactionIdIsValid (x -> xid ))
484- {
485455 SpinLockAcquire (& local -> lock );
486- // x->xid = GetCurrentTransactionIdIfAny();
487456 x -> cid = INVALID_CID ;
488- x -> is_global = false;
489- x -> is_prepared = false;
490457 x -> snapshot = dtm_get_cid ();
491458 SpinLockRelease (& local -> lock );
492459 DTM_TRACE ((stderr , "DtmLocalBegin: transaction %u uses local snapshot %lu\n" , x -> xid , x -> snapshot ));
493- }
494460}
495461
496462/*
@@ -516,7 +482,6 @@ DtmLocalExtend(GlobalTransactionId gtid)
516482 strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
517483 SpinLockRelease (& local -> lock );
518484 }
519- x -> is_global = true;
520485 DtmInitGlobalXmin (TransactionXmin );
521486 return x -> snapshot ;
522487}
@@ -543,7 +508,6 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
543508 }
544509 local_cid = dtm_sync (global_cid );
545510 x -> snapshot = global_cid ;
546- x -> is_global = true;
547511 }
548512 strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
549513 SpinLockRelease (& local -> lock );
@@ -627,85 +591,59 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
627591 }
628592 SpinLockRelease (& local -> lock );
629593
630- /*
631- * Record commit in pg_committed_xact table to be make it possible to
632- * perform recovery in case of crash of some of cluster nodes
633- */
634- if (DtmRecordCommits )
635- {
636- char stmt [MAX_GTID_SIZE + 64 ];
637- int rc ;
638-
639- sprintf (stmt , "insert into pg_committed_xacts values ('%s')" , gtid );
640- SPI_connect ();
641- rc = SPI_execute (stmt , true, 0 );
642- SPI_finish ();
643- if (rc != SPI_OK_INSERT )
644- {
645- elog (ERROR , "Failed to insert GTID %s in table pg_committed_xacts" , gtid );
646- }
647- }
648594}
649595
650596/*
651- * Mark tranasction as prepared
597+ * Set transaction status to committed
652598 */
653599void
654- DtmLocalCommitPrepared ( DtmCurrentTrans * x )
600+ DtmLocalFinish ( bool is_commit )
655601{
656- if (!x -> gtid [0 ])
657- return ;
658-
659- Assert (x -> gtid != NULL );
602+ DtmCurrentTrans * x = & dtm_tx ;
603+ TransactionId xid = GetCurrentTransactionIdIfAny ();
660604
661- SpinLockAcquire ( & local -> lock );
605+ if ( x -> gtid [ 0 ] && finishing_prepared )
662606 {
663- DtmTransId * id = ( DtmTransId * ) hash_search ( gtid2xid , x -> gtid , HASH_REMOVE , NULL );
607+ // Assert(!TransactionIdIsValid(xid) );
664608
665- Assert (id != NULL );
609+ SpinLockAcquire (& local -> lock );
610+ {
611+ DtmTransId * id = (DtmTransId * ) hash_search (gtid2xid , x -> gtid , HASH_REMOVE , NULL );
666612
667- x -> is_global = true;
668- x -> is_prepared = true;
669- x -> xid = id -> xid ;
670- free (id -> subxids );
613+ Assert (id != NULL );
614+ Assert (TransactionIdIsValid (id -> xid ));
671615
672- DTM_TRACE ((stderr , "Global transaction %u(%s) is precommitted\n" , x -> xid , gtid ));
616+ xid = id -> xid ;
617+ free (id -> subxids );
618+ }
619+ SpinLockRelease (& local -> lock );
620+ }
621+ else if (!TransactionIdIsValid (xid ))
622+ {
623+ return ;
673624 }
674- SpinLockRelease (& local -> lock );
675-
676- DtmAdjustOldestXid ();
677- // elog(LOG, "DtmLocalCommitPrepared %d", x->xid);
678- }
679-
680- /*
681- * Set transaction status to committed
682- */
683- void
684- DtmLocalCommit (DtmCurrentTrans * x )
685- {
686- // if (!x->is_global)
687- // return;
688625
689626 SpinLockAcquire (& local -> lock );
690- if (TransactionIdIsValid (x -> xid ))
691627 {
692628 bool found ;
693629 DtmTransStatus * ts ;
694630
695- ts = (DtmTransStatus * ) hash_search (xid2status , & x -> xid , HASH_ENTER , & found );
696- ts -> status = TRANSACTION_STATUS_COMMITTED ;
631+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
632+ ts -> status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED ;
697633 if (found )
698634 {
699- int i ;
700- DtmTransStatus * sts = ts ;
701635
702- Assert (found );
703- Assert (x -> is_global );
704- for (i = 0 ; i < ts -> nSubxids ; i ++ )
636+ if (is_commit ) // XXX: why only for commit?
705637 {
706- sts = sts -> next ;
707- Assert (sts -> cid == ts -> cid );
708- sts -> status = TRANSACTION_STATUS_COMMITTED ;
638+ int i ;
639+ DtmTransStatus * sts = ts ;
640+
641+ for (i = 0 ; i < ts -> nSubxids ; i ++ )
642+ {
643+ sts = sts -> next ;
644+ Assert (sts -> cid == ts -> cid );
645+ sts -> status = TRANSACTION_STATUS_COMMITTED ;
646+ }
709647 }
710648 }
711649 else
@@ -715,86 +653,31 @@ DtmLocalCommit(DtmCurrentTrans * x)
715653 Assert (!found );
716654 ts -> cid = dtm_get_cid ();
717655 DtmTransactionListAppend (ts );
718- ts -> nSubxids = xactGetCommittedChildren (& subxids );
719- DtmAddSubtransactions (ts , subxids , ts -> nSubxids );
656+ if (is_commit ) // XXX: why?
657+ {
658+ ts -> nSubxids = xactGetCommittedChildren (& subxids );
659+ DtmAddSubtransactions (ts , subxids , ts -> nSubxids );
660+ }
661+ else
662+ {
663+ ts -> nSubxids = 0 ;
664+ }
720665 }
721666 x -> cid = ts -> cid ;
722667 DTM_TRACE ((stderr , "Local transaction %u is committed at %lu\n" , x -> xid , x -> cid ));
723668 }
724669 SpinLockRelease (& local -> lock );
725670
726- DtmAdjustOldestXid ();
671+ // DtmAdjustOldestXid();
727672 // elog(LOG, "DtmLocalCommit %d", x->xid);
728673}
729674
730- /*
731- * Mark tranasction as prepared
732- */
733- void
734- DtmLocalAbortPrepared (DtmCurrentTrans * x )
735- {
736- if (!x -> gtid [0 ])
737- return ;
738-
739- Assert (x -> gtid != NULL );
740-
741- SpinLockAcquire (& local -> lock );
742- {
743- DtmTransId * id = (DtmTransId * ) hash_search (gtid2xid , x -> gtid , HASH_REMOVE , NULL );
744-
745- Assert (id != NULL );
746- x -> is_global = true;
747- x -> is_prepared = true;
748- x -> xid = id -> xid ;
749- free (id -> subxids );
750- DTM_TRACE ((stderr , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
751- }
752- SpinLockRelease (& local -> lock );
753- }
754-
755- /*
756- * Set transaction status to aborted
757- */
758- void
759- DtmLocalAbort (DtmCurrentTrans * x )
760- {
761- if (!TransactionIdIsValid (x -> xid ))
762- return ;
763-
764- SpinLockAcquire (& local -> lock );
765- {
766- bool found ;
767- DtmTransStatus * ts ;
768-
769- Assert (TransactionIdIsValid (x -> xid ));
770- ts = (DtmTransStatus * ) hash_search (xid2status , & x -> xid , HASH_ENTER , & found );
771- if (found )
772- {
773- Assert (found );
774- Assert (x -> is_global );
775- }
776- else
777- {
778- Assert (!found );
779- ts -> cid = dtm_get_cid ();
780- ts -> nSubxids = 0 ;
781- DtmTransactionListAppend (ts );
782- }
783- x -> cid = ts -> cid ;
784- ts -> status = TRANSACTION_STATUS_ABORTED ;
785- DTM_TRACE ((stderr , "Local transaction %u is aborted at %lu\n" , x -> xid , x -> cid ));
786- }
787- SpinLockRelease (& local -> lock );
788- }
789-
790675/*
791676 * Cleanup dtm_tx structure
792677 */
793678void
794679DtmLocalEnd (DtmCurrentTrans * x )
795680{
796- x -> is_global = false;
797- x -> is_prepared = false;
798681 x -> xid = InvalidTransactionId ;
799682 x -> cid = INVALID_CID ;
800683}
@@ -854,7 +737,6 @@ DtmGetCsn(TransactionId xid)
854737void
855738DtmLocalSavePreparedState (DtmCurrentTrans * x )
856739{
857- // x->is_prepared = true;
858740
859741 if (x -> gtid [0 ])
860742 {
0 commit comments