@@ -49,7 +49,6 @@ typedef uint64 timestamp_t;
4949typedef struct DtmTransStatus
5050{
5151 TransactionId xid ;
52- XidStatus status ;
5352 int nSubxids ;
5453 cid_t cid ; /* CSN */
5554 struct DtmTransStatus * next ;/* pointer to next element in finished
@@ -292,8 +291,7 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
292291
293292 for (i = 0 ; i < nSubxids ; i ++ ) {
294293 sts = sts -> next ;
295- sts -> status = ts -> status ;
296- Assert (sts -> cid == ts -> cid );
294+ sts -> cid = ts -> cid ;
297295 }
298296}
299297
@@ -380,14 +378,14 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
380378
381379 if (ts != NULL )
382380 {
383- if (ts -> cid > dtm_tx .snapshot )
381+ if (GlobalCSNIsNormal ( ts -> cid ) && ts -> cid > dtm_tx .snapshot )
384382 {
385383 DTM_TRACE ((stderr , "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n" ,
386384 getpid (), xid , ts -> cid , dtm_tx .snapshot ));
387385 SpinLockRelease (& local -> lock );
388386 return true;
389387 }
390- if (ts -> status == TRANSACTION_STATUS_UNKNOWN )
388+ if (ts -> cid == InDoubtGlobalCSN )
391389 {
392390 DTM_TRACE ((stderr , "%d: wait for in-doubt transaction %u in snapshot %lu\n" , getpid (), xid , dtm_tx .snapshot ));
393391 SpinLockRelease (& local -> lock );
@@ -400,7 +398,10 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
400398 }
401399 else
402400 {
403- bool invisible = ts -> status == TRANSACTION_STATUS_ABORTED ;
401+ bool invisible = ts -> cid == AbortedGlobalCSN ;
402+
403+ if (!invisible )
404+ Assert (GlobalCSNIsNormal (ts -> cid ));
404405
405406 DTM_TRACE ((stderr , "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n" ,
406407 getpid (), xid , ts -> cid , invisible ? "rollbacked" : "committed" , dtm_tx .snapshot ));
@@ -473,6 +474,9 @@ DtmLocalExtend(GlobalTransactionId gtid)
473474 strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
474475 }
475476 DtmInitGlobalXmin (TransactionXmin );
477+
478+ dtm_tx .is_global = true;
479+
476480 return x -> snapshot ;
477481}
478482
@@ -495,6 +499,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
495499 strncpy (x -> gtid , gtid , MAX_GTID_SIZE );
496500 SpinLockRelease (& local -> lock );
497501
502+ dtm_tx .is_global = true;
503+
498504 if (global_cid < local_cid - DtmVacuumDelay * USEC )
499505 {
500506 elog (ERROR , "Too old snapshot: requested %ld, current %ld" , global_cid , local_cid );
@@ -511,27 +517,34 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
511517void
512518DtmLocalBeginPrepare (GlobalTransactionId gtid )
513519{
514- TransactionId xid = TwoPhaseGetTransactionId (gtid );
515-
516- if (!TransactionIdIsValid (xid ))
517- {
518- // XXX: check that it is global tx with the same xid, XactTopTransactionId?
519- xid = GetCurrentTransactionId ();
520- }
520+ TransactionId xid = GetCurrentTransactionIdIfAny ();
521521
522- SpinLockAcquire ( & local -> lock );
522+ if ( TransactionIdIsValid ( xid )) // XXX: decide based on empty gtid?
523523 {
524+ // inside global 1pc tx
525+ TransactionId * subxids ;
526+ int nSubxids = xactGetCommittedChildren (& subxids );
524527 DtmTransStatus * ts ;
525528 bool found ;
526529
527- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
528- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
529- ts -> cid = dtm_get_cid ();
530- if (!found )
531- ts -> nSubxids = 0 ;
532- DtmAdjustSubtransactions (ts );
530+ Assert (dtm_tx .is_global ); // XXX: change to error
531+
532+ SpinLockAcquire (& local -> lock );
533+ {
534+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
535+ Assert (!found );
536+ ts -> cid = InDoubtGlobalCSN ;
537+ ts -> nSubxids = nSubxids ;
538+ DtmTransactionListAppend (ts );
539+ DtmAddSubtransactions (ts , subxids , nSubxids );
540+ }
541+ SpinLockRelease (& local -> lock );
533542 }
534- SpinLockRelease (& local -> lock );
543+ else
544+ {
545+ // inside after-prepare fx
546+ }
547+
535548}
536549
537550/*
@@ -559,31 +572,24 @@ DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
559572void
560573DtmLocalEndPrepare (GlobalTransactionId gtid , cid_t cid )
561574{
562- TransactionId xid = TwoPhaseGetTransactionId ( gtid );
575+ TransactionId xid = GetCurrentTransactionIdIfAny ( );
563576
564- if (! TransactionIdIsValid (xid ))
577+ if (TransactionIdIsValid (xid ))
565578 {
566- // XXX: check that it is global tx with the same xid, XactTopTransactionId?
567- xid = GetCurrentTransactionId ();
579+ Assert (dtm_tx .is_global );
568580 }
569-
570- dtm_tx .xid = xid ;
571-
572- SpinLockAcquire (& local -> lock );
581+ else
573582 {
574- DtmTransStatus * ts ;
575- DtmTransId * id ;
576- int i ;
583+ // inside after-prepare fx
584+ xid = TwoPhaseGetTransactionId (gtid );
585+ // Assert(TransactionIdIsValid(xid));
586+ if (!TransactionIdIsValid (xid ))
587+ return ; // global ro tx
588+ }
577589
578- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_FIND , NULL );
579- Assert (ts != NULL );
580- ts -> cid = cid ;
581- DtmAdjustSubtransactions (ts );
582- dtm_sync (cid );
583590
584- DTM_TRACE ((stderr , "Prepare transaction %u(%s) with CSN %lu\n" , id -> xid , gtid , cid ));
585- }
586- SpinLockRelease (& local -> lock );
591+ dtm_tx .xid = xid ;
592+ dtm_tx .csn = cid ;
587593
588594}
589595
@@ -598,7 +604,7 @@ DtmLocalFinish(bool is_commit)
598604
599605 if (x -> gtid [0 ] && finishing_prepared )
600606 {
601- xid = x -> xid ;
607+ xid = dtm_tx . xid ;
602608 }
603609 else if (!TransactionIdIsValid (xid ))
604610 {
@@ -611,19 +617,25 @@ DtmLocalFinish(bool is_commit)
611617 DtmTransStatus * ts ;
612618
613619 ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
614- ts -> status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED ;
615620
616- if (!found )
621+ if (found )
622+ {
623+ Assert (GlobalCSNIsNormal (dtm_tx .csn ));
624+ ts -> cid = is_commit ? dtm_tx .csn : AbortedGlobalCSN ;
625+
626+ dtm_tx .xid = InvalidTransactionId ;
627+ dtm_tx .csn = InvalidGlobalCSN ;
628+ }
629+ else
617630 {
618- ts -> cid = dtm_get_cid ( );
619- ts -> nSubxids = 0 ;
631+ Assert (! GlobalCSNIsNormal ( dtm_tx . csn ) );
632+ ts -> cid = is_commit ? dtm_get_cid () : AbortedGlobalCSN ;
620633 DtmTransactionListAppend (ts );
621634 }
622635 DtmAdjustSubtransactions (ts );
623636 }
624637 SpinLockRelease (& local -> lock );
625638
626- // DtmAdjustOldestXid();
627639}
628640
629641/*
@@ -657,32 +669,14 @@ DtmDeserializeTransactionState(void* ctx)
657669}
658670
659671
660- cid_t
661- DtmGetCsn (TransactionId xid )
662- {
663- cid_t csn = 0 ;
664-
665- SpinLockAcquire (& local -> lock );
666- {
667- DtmTransStatus * ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_FIND , NULL );
668-
669- if (ts != NULL )
670- {
671- csn = ts -> cid ;
672- }
673- }
674- SpinLockRelease (& local -> lock );
675- return csn ;
676- }
677-
678672/*
679673 * Save state of parepared transaction
680674 */
681675void
682676DtmLocalSavePreparedState (DtmCurrentTrans * x )
683677{
684678
685- if (x -> gtid [ 0 ] )
679+ if (dtm_tx . is_global )
686680 {
687681 TransactionId * subxids ;
688682 TransactionId xid = GetCurrentTransactionId ();
@@ -691,10 +685,11 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
691685 SpinLockAcquire (& local -> lock );
692686 {
693687 DtmTransStatus * ts ;
688+ bool found ;
694689
695- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , NULL );
696- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
697- ts -> cid = dtm_get_cid () ;
690+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
691+ Assert (! found ) ;
692+ ts -> cid = InDoubtGlobalCSN ;
698693 ts -> nSubxids = nSubxids ;
699694 DtmTransactionListAppend (ts );
700695 DtmAddSubtransactions (ts , subxids , nSubxids );
@@ -720,7 +715,6 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
720715 Assert (TransactionIdIsValid (subxids [i ]));
721716 sts = (DtmTransStatus * ) hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
722717 Assert (!found );
723- sts -> status = ts -> status ;
724718 sts -> cid = ts -> cid ;
725719 sts -> nSubxids = 0 ;
726720 DtmTransactionListInsertAfter (ts , sts );
0 commit comments