1717#include "access/htup_details.h"
1818#include "catalog/pg_user_mapping.h"
1919#include "access/xact.h"
20+ #include "access/xtm.h"
21+ #include "access/transam.h"
2022#include "mb/pg_wchar.h"
2123#include "miscadmin.h"
2224#include "pgstat.h"
@@ -72,12 +74,18 @@ static unsigned int prep_stmt_number = 0;
7274/* tracks whether any work is needed in callback functions */
7375static bool xact_got_connection = false;
7476
77+ typedef long long csn_t ;
78+ static csn_t currentGlobalTransactionId = 0 ;
79+ static int currentLocalTransactionId = 0 ;
80+ static PGconn * currentConnection = NULL ;
81+
7582/* prototypes of private functions */
7683static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
7784static void disconnect_pg_server (ConnCacheEntry * entry );
7885static void check_conn_params (const char * * keywords , const char * * values , UserMapping * user );
7986static void configure_remote_session (PGconn * conn );
8087static void do_sql_command (PGconn * conn , const char * sql );
88+ static void do_sql_send_command (PGconn * conn , const char * sql );
8189static void begin_remote_xact (ConnCacheEntry * entry );
8290static void pgfdw_xact_callback (XactEvent event , void * arg );
8391static void pgfdw_subxact_callback (SubXactEvent event ,
@@ -403,6 +411,19 @@ do_sql_command(PGconn *conn, const char *sql)
403411 PQclear (res );
404412}
405413
414+ static void
415+ do_sql_send_command (PGconn * conn , const char * sql )
416+ {
417+ if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK )
418+ {
419+ PGresult * res = PQgetResult (conn );
420+
421+ elog (WARNING , "Failed to send command %s" , sql );
422+ pgfdw_report_error (ERROR , res , conn , true, sql );
423+ PQclear (res );
424+ }
425+ }
426+
406427/*
407428 * Start remote transaction or subtransaction, if needed.
408429 *
@@ -417,15 +438,26 @@ static void
417438begin_remote_xact (ConnCacheEntry * entry )
418439{
419440 int curlevel = GetCurrentTransactionNestLevel ();
441+ PGresult * res ;
442+
420443
421444 /* Start main transaction if we haven't yet */
422445 if (entry -> xact_depth <= 0 )
423446 {
447+ TransactionId gxid = GetTransactionManager ()-> GetGlobalTransactionId ();
424448 const char * sql ;
425449
426450 elog (DEBUG3 , "starting remote transaction on connection %p" ,
427451 entry -> conn );
428452
453+ if (TransactionIdIsValid (gxid ))
454+ {
455+ char stmt [64 ];
456+ snprintf (stmt , sizeof (stmt ), "select public.dtm_join_transaction(%d)" , gxid );
457+ res = PQexec (entry -> conn , stmt );
458+ PQclear (res );
459+ }
460+
429461 if (IsolationIsSerializable ())
430462 sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE" ;
431463 else
@@ -434,6 +466,41 @@ begin_remote_xact(ConnCacheEntry *entry)
434466 do_sql_command (entry -> conn , sql );
435467 entry -> xact_depth = 1 ;
436468 entry -> changing_xact_state = false;
469+
470+ if (UseTsDtmTransactions )
471+ {
472+ if (currentConnection == NULL )
473+ {
474+ currentConnection = entry -> conn ;
475+ }
476+ else if (entry -> conn != currentConnection )
477+ {
478+ if (!currentGlobalTransactionId )
479+ {
480+ char * resp ;
481+ res = PQexec (currentConnection , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
482+ MyProcPid , ++ currentLocalTransactionId ));
483+
484+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
485+ {
486+ pgfdw_report_error (ERROR , res , currentConnection , true, sql );
487+ }
488+ resp = PQgetvalue (res , 0 , 0 );
489+ if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & currentGlobalTransactionId ) != 1 )
490+ {
491+ pgfdw_report_error (ERROR , res , currentConnection , true, sql );
492+ }
493+ PQclear (res );
494+ }
495+ res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
496+
497+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
498+ {
499+ pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
500+ }
501+ PQclear (res );
502+ }
503+ }
437504 }
438505
439506 /*
@@ -643,6 +710,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
643710 PQclear (res );
644711}
645712
713+ typedef bool (* DtmCommandResultHandler ) (PGresult * result , void * arg );
714+
715+ static bool
716+ RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
717+ {
718+ HASH_SEQ_STATUS scan ;
719+ ConnCacheEntry * entry ;
720+ bool allOk = true;
721+
722+ hash_seq_init (& scan , ConnectionHash );
723+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
724+ {
725+ if (entry -> xact_depth > 0 )
726+ {
727+ do_sql_send_command (entry -> conn , sql );
728+ }
729+ }
730+
731+ hash_seq_init (& scan , ConnectionHash );
732+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
733+ {
734+ if (entry -> xact_depth > 0 )
735+ {
736+ PGresult * result = PQgetResult (entry -> conn );
737+
738+ if (PQresultStatus (result ) != expectedStatus || (handler && !handler (result , arg )))
739+ {
740+ elog (WARNING , "Failed command %s: status=%d, expected status=%d" , sql , PQresultStatus (result ), expectedStatus );
741+ pgfdw_report_error (ERROR , result , entry -> conn , true, sql );
742+ allOk = false;
743+ }
744+ PQclear (result );
745+ PQgetResult (entry -> conn ); /* consume NULL result */
746+ }
747+ }
748+ return allOk ;
749+ }
750+
751+ static bool
752+ RunDtmCommand (char const * sql )
753+ {
754+ return RunDtmStatement (sql , PGRES_COMMAND_OK , NULL , NULL );
755+ }
756+
757+ static bool
758+ RunDtmFunction (char const * sql )
759+ {
760+ return RunDtmStatement (sql , PGRES_TUPLES_OK , NULL , NULL );
761+ }
762+
763+
764+ static bool
765+ DtmMaxCSN (PGresult * result , void * arg )
766+ {
767+ char * resp = PQgetvalue (result , 0 , 0 );
768+ csn_t * maxCSN = (csn_t * ) arg ;
769+ csn_t csn = 0 ;
770+
771+ if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & csn ) != 1 )
772+ {
773+ return false;
774+ }
775+ else
776+ {
777+ if (* maxCSN < csn )
778+ {
779+ * maxCSN = csn ;
780+ }
781+ return true;
782+ }
783+ }
784+
646785/*
647786 * pgfdw_xact_callback --- cleanup at main-transaction end.
648787 */
@@ -652,10 +791,55 @@ pgfdw_xact_callback(XactEvent event, void *arg)
652791 HASH_SEQ_STATUS scan ;
653792 ConnCacheEntry * entry ;
654793
794+ /* Do nothing for this events */
795+ switch (event )
796+ {
797+ case XACT_EVENT_START :
798+ case XACT_EVENT_COMMIT_PREPARED :
799+ case XACT_EVENT_ABORT_PREPARED :
800+ return ;
801+ default :
802+ break ;
803+ }
804+
655805 /* Quick exit if no connections were touched in this transaction. */
656806 if (!xact_got_connection )
657807 return ;
658808
809+ if (currentGlobalTransactionId != 0 )
810+ {
811+ switch (event )
812+ {
813+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
814+ case XACT_EVENT_PRE_COMMIT :
815+ {
816+ csn_t maxCSN = 0 ;
817+
818+ if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
819+ MyProcPid , currentLocalTransactionId )) ||
820+ !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
821+ MyProcPid , currentLocalTransactionId )) ||
822+ !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
823+ MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
824+ !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
825+ MyProcPid , currentLocalTransactionId , maxCSN )) ||
826+ !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
827+ MyProcPid , currentLocalTransactionId )))
828+ {
829+ RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
830+ MyProcPid , currentLocalTransactionId ));
831+ ereport (ERROR ,
832+ (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
833+ errmsg ("transaction was aborted at one of the shards" )));
834+ break ;
835+ }
836+ return ;
837+ }
838+ default :
839+ break ;
840+ }
841+ }
842+
659843 /*
660844 * Scan all connection cache entries to find open remote transactions, and
661845 * close them.
@@ -689,10 +873,34 @@ pgfdw_xact_callback(XactEvent event, void *arg)
689873 pgfdw_reject_incomplete_xact_state_change (entry );
690874
691875 /* Commit all remote transactions during pre-commit */
692- entry -> changing_xact_state = true;
693- do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
694- entry -> changing_xact_state = false;
876+ do_sql_send_command (entry -> conn , "COMMIT TRANSACTION" );
877+ continue ;
878+
879+ case XACT_EVENT_PRE_PREPARE :
880+
881+ /*
882+ * We disallow remote transactions that modified anything,
883+ * since it's not very reasonable to hold them open until
884+ * the prepared transaction is committed. For the moment,
885+ * throw error unconditionally; later we might allow
886+ * read-only cases. Note that the error will cause us to
887+ * come right back here with event == XACT_EVENT_ABORT, so
888+ * we'll clean up the connection state at that point.
889+ */
890+ ereport (ERROR ,
891+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
892+ errmsg ("cannot prepare a transaction that modified remote tables" )));
893+ break ;
695894
895+ case XACT_EVENT_PARALLEL_COMMIT :
896+ case XACT_EVENT_COMMIT :
897+ case XACT_EVENT_PREPARE :
898+ if (!currentGlobalTransactionId )
899+ {
900+ entry -> changing_xact_state = true;
901+ do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
902+ entry -> changing_xact_state = false;
903+ }
696904 /*
697905 * If there were any errors in subtransactions, and we
698906 * made prepared statements, do a DEALLOCATE ALL to make
@@ -716,27 +924,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
716924 entry -> have_prep_stmt = false;
717925 entry -> have_error = false;
718926 break ;
719- case XACT_EVENT_PRE_PREPARE :
720927
721- /*
722- * We disallow remote transactions that modified anything,
723- * since it's not very reasonable to hold them open until
724- * the prepared transaction is committed. For the moment,
725- * throw error unconditionally; later we might allow
726- * read-only cases. Note that the error will cause us to
727- * come right back here with event == XACT_EVENT_ABORT, so
728- * we'll clean up the connection state at that point.
729- */
730- ereport (ERROR ,
731- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
732- errmsg ("cannot prepare a transaction that modified remote tables" )));
733- break ;
734- case XACT_EVENT_PARALLEL_COMMIT :
735- case XACT_EVENT_COMMIT :
736- case XACT_EVENT_PREPARE :
737- /* Pre-commit should have closed the open transaction */
738- elog (ERROR , "missed cleaning up connection during pre-commit" );
739- break ;
740928 case XACT_EVENT_PARALLEL_ABORT :
741929 case XACT_EVENT_ABORT :
742930
@@ -800,6 +988,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
800988 /* Disarm changing_xact_state if it all worked. */
801989 entry -> changing_xact_state = abort_cleanup_failure ;
802990 break ;
991+
992+ case XACT_EVENT_START :
993+ case XACT_EVENT_COMMIT_PREPARED :
994+ case XACT_EVENT_ABORT_PREPARED :
995+ break ;
803996 }
804997 }
805998
@@ -818,16 +1011,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
8181011 disconnect_pg_server (entry );
8191012 }
8201013 }
1014+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT )
1015+ {
1016+ /*
1017+ * Regardless of the event type, we can now mark ourselves as out of
1018+ * the transaction. (Note: if we are here during PRE_COMMIT or
1019+ * PRE_PREPARE, this saves a useless scan of the hashtable during
1020+ * COMMIT or PREPARE.)
1021+ */
1022+ xact_got_connection = false;
8211023
822- /*
823- * Regardless of the event type, we can now mark ourselves as out of the
824- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
825- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
826- */
827- xact_got_connection = false;
1024+ /* Also reset cursor numbering for next transaction */
1025+ cursor_number = 0 ;
8281026
829- /* Also reset cursor numbering for next transaction */
830- cursor_number = 0 ;
1027+ currentGlobalTransactionId = 0 ;
1028+ currentConnection = NULL ;
1029+ }
8311030}
8321031
8331032/*
0 commit comments