@@ -295,6 +295,29 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
295295 }
296296}
297297
298+ /*
299+ * Add subtransactions to finished transactions list.
300+ * Copy CSN and status of parent transaction.
301+ */
302+ static void
303+ DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids )
304+ {
305+ int i ;
306+
307+ for (i = 0 ; i < nSubxids ; i ++ )
308+ {
309+ bool found ;
310+ DtmTransStatus * sts ;
311+
312+ Assert (TransactionIdIsValid (subxids [i ]));
313+ sts = (DtmTransStatus * ) hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
314+ Assert (!found );
315+ sts -> cid = ts -> cid ;
316+ sts -> nSubxids = 0 ;
317+ DtmTransactionListInsertAfter (ts , sts );
318+ }
319+ }
320+
298321/*
299322 * There can be different oldest XIDs at different cluster node.
300323 * Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -465,27 +488,19 @@ DtmLocalBegin(DtmCurrentTrans * x)
465488 * Returns snapshot of current transaction.
466489 */
467490cid_t
468- DtmLocalExtend (GlobalTransactionId gtid )
491+ DtmLocalExtend ()
469492{
470- DtmCurrentTrans * x = & dtm_tx ;
471-
472- if (gtid != NULL )
473- {
474- strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
475- }
476493 DtmInitGlobalXmin (TransactionXmin );
477-
478494 dtm_tx .is_global = true;
479-
480- return x -> snapshot ;
495+ return dtm_tx .snapshot ;
481496}
482497
483498/*
484499 * This function is executed on all nodes joining distributed transaction.
485500 * global_cid is snapshot taken from node initiated this transaction
486501 */
487502cid_t
488- DtmLocalAccess (DtmCurrentTrans * x , GlobalTransactionId gtid , cid_t global_cid )
503+ DtmLocalAccess (cid_t global_cid )
489504{
490505 cid_t local_cid ;
491506
@@ -494,9 +509,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
494509 SpinLockAcquire (& local -> lock );
495510 {
496511 local_cid = dtm_sync (global_cid );
497- x -> snapshot = global_cid ;
512+ dtm_tx . snapshot = global_cid ;
498513 }
499- strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
500514 SpinLockRelease (& local -> lock );
501515
502516 dtm_tx .is_global = true;
@@ -510,6 +524,37 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
510524 return global_cid ;
511525}
512526
527+
528+ /*
529+ * Save state of parepared transaction
530+ */
531+ void
532+ DtmLocalSavePreparedState (DtmCurrentTrans * x )
533+ {
534+
535+ if (dtm_tx .is_global )
536+ {
537+ TransactionId * subxids ;
538+ TransactionId xid = GetCurrentTransactionId ();
539+ int nSubxids = xactGetCommittedChildren (& subxids );
540+
541+ SpinLockAcquire (& local -> lock );
542+ {
543+ DtmTransStatus * ts ;
544+ bool found ;
545+
546+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
547+ Assert (!found );
548+ ts -> cid = InDoubtGlobalCSN ;
549+ ts -> nSubxids = nSubxids ;
550+ DtmTransactionListAppend (ts );
551+ DtmAddSubtransactions (ts , subxids , nSubxids );
552+ }
553+ SpinLockRelease (& local -> lock );
554+ }
555+ }
556+
557+
513558/*
514559 * Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
515560 * wait until it is either committed either aborted
@@ -587,10 +632,8 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
587632 return ; // global ro tx
588633 }
589634
590-
591635 dtm_tx .xid = xid ;
592636 dtm_tx .csn = cid ;
593-
594637}
595638
596639/*
@@ -599,43 +642,57 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
599642void
600643DtmLocalFinish (bool is_commit )
601644{
602- DtmCurrentTrans * x = & dtm_tx ;
603645 TransactionId xid = GetCurrentTransactionIdIfAny ();
646+ bool found ;
647+ DtmTransStatus * ts ;
604648
605- if (x -> gtid [0 ] && finishing_prepared )
649+ // We can't check just TransactionIdIsValid(dtm_tx.xid) because
650+ // then we catch commit of `select pg_global_snaphot_end_prepare(...)`
651+ if (TransactionIdIsValid (dtm_tx .xid ) &&
652+ (finishing_prepared || // commit prepared of global
653+ TransactionIdIsValid (xid ))) // ordinary commit of global
606654 {
655+ // Commit of global prepared tx
656+
607657 xid = dtm_tx .xid ;
658+ Assert (GlobalCSNIsNormal (dtm_tx .csn ));
659+
660+ SpinLockAcquire (& local -> lock );
661+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_FIND , & found );
662+ Assert (found );
663+ ts -> cid = is_commit ? dtm_tx .csn : AbortedGlobalCSN ;
664+ DtmAdjustSubtransactions (ts ); // !
665+ SpinLockRelease (& local -> lock );
666+
667+ dtm_tx .xid = InvalidTransactionId ;
668+ dtm_tx .csn = InvalidGlobalCSN ;
669+ dtm_tx .is_global = false;
608670 }
609- else if (! TransactionIdIsValid (xid ))
671+ else if (TransactionIdIsValid (xid ))
610672 {
611- return ;
612- }
673+ // Commit of local tx
613674
614- SpinLockAcquire (& local -> lock );
615- {
616- bool found ;
617- DtmTransStatus * ts ;
675+ TransactionId * subxids ;
676+ int nSubxids = xactGetCommittedChildren (& subxids );
618677
619- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
678+ Assert (!GlobalCSNIsNormal (dtm_tx .csn ));
679+ Assert (!TransactionIdIsValid (dtm_tx .xid ));
620680
621- if (found )
681+ if (dtm_tx . is_global )
622682 {
623- Assert (GlobalCSNIsNormal (dtm_tx .csn ));
624- ts -> cid = is_commit ? dtm_tx .csn : AbortedGlobalCSN ;
625-
626- dtm_tx .xid = InvalidTransactionId ;
627- dtm_tx .csn = InvalidGlobalCSN ;
628- }
629- else
630- {
631- Assert (!GlobalCSNIsNormal (dtm_tx .csn ));
632- ts -> cid = is_commit ? dtm_get_cid () : AbortedGlobalCSN ;
633- DtmTransactionListAppend (ts );
683+ Assert (!is_commit );
684+ dtm_tx .is_global = false;
634685 }
635- DtmAdjustSubtransactions (ts );
636- }
637- SpinLockRelease (& local -> lock );
638686
687+ SpinLockAcquire (& local -> lock );
688+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
689+ Assert (!found );
690+ ts -> cid = is_commit ? dtm_get_cid () : AbortedGlobalCSN ;
691+ ts -> nSubxids = nSubxids ;
692+ DtmTransactionListAppend (ts );
693+ DtmAddSubtransactions (ts , subxids , nSubxids );
694+ SpinLockRelease (& local -> lock );
695+ }
639696}
640697
641698/*
@@ -669,59 +726,6 @@ DtmDeserializeTransactionState(void* ctx)
669726}
670727
671728
672- /*
673- * Save state of parepared transaction
674- */
675- void
676- DtmLocalSavePreparedState (DtmCurrentTrans * x )
677- {
678-
679- if (dtm_tx .is_global )
680- {
681- TransactionId * subxids ;
682- TransactionId xid = GetCurrentTransactionId ();
683- int nSubxids = xactGetCommittedChildren (& subxids );
684-
685- SpinLockAcquire (& local -> lock );
686- {
687- DtmTransStatus * ts ;
688- bool found ;
689-
690- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
691- Assert (!found );
692- ts -> cid = InDoubtGlobalCSN ;
693- ts -> nSubxids = nSubxids ;
694- DtmTransactionListAppend (ts );
695- DtmAddSubtransactions (ts , subxids , nSubxids );
696- }
697- SpinLockRelease (& local -> lock );
698- }
699- }
700-
701- /*
702- * Add subtransactions to finished transactions list.
703- * Copy CSN and status of parent transaction.
704- */
705- static void
706- DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids )
707- {
708- int i ;
709-
710- for (i = 0 ; i < nSubxids ; i ++ )
711- {
712- bool found ;
713- DtmTransStatus * sts ;
714-
715- Assert (TransactionIdIsValid (subxids [i ]));
716- sts = (DtmTransStatus * ) hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
717- Assert (!found );
718- sts -> cid = ts -> cid ;
719- sts -> nSubxids = 0 ;
720- DtmTransactionListInsertAfter (ts , sts );
721- }
722- }
723-
724-
725729/*
726730 *
727731 * SQL functions for global snapshot mamagement.
@@ -731,8 +735,7 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
731735Datum
732736pg_global_snaphot_create (PG_FUNCTION_ARGS )
733737{
734- GlobalTransactionId gtid = text_to_cstring (PG_GETARG_TEXT_PP (0 ));
735- cid_t cid = DtmLocalExtend (gtid );
738+ cid_t cid = DtmLocalExtend ();
736739
737740 DTM_TRACE ((stderr , "Backend %d extends transaction %u(%s) to global with cid=%lu\n" , getpid (), dtm_tx .xid , gtid , cid ));
738741 PG_RETURN_INT64 (cid );
@@ -742,10 +745,9 @@ Datum
742745pg_global_snaphot_join (PG_FUNCTION_ARGS )
743746{
744747 cid_t cid = PG_GETARG_INT64 (0 );
745- GlobalTransactionId gtid = text_to_cstring (PG_GETARG_TEXT_PP (1 ));
746748
747749 DTM_TRACE ((stderr , "Backend %d joins transaction %u(%s) with cid=%lu\n" , getpid (), dtm_tx .xid , gtid , cid ));
748- cid = DtmLocalAccess (& dtm_tx , gtid , cid );
750+ cid = DtmLocalAccess (cid );
749751 PG_RETURN_INT64 (cid );
750752}
751753
0 commit comments