@@ -64,7 +64,7 @@ static bool xact_got_connection = false;
6464
6565typedef long long csn_t ;
6666static csn_t currentGlobalTransactionId = 0 ;
67- static int currentLocalTransactionId = 0 ;
67+ static int currentLocalTransactionId = 0 ;
6868
6969/* prototypes of private functions */
7070static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
@@ -367,8 +367,10 @@ do_sql_command(PGconn *conn, const char *sql)
367367static void
368368do_sql_send_command (PGconn * conn , const char * sql )
369369{
370- if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK ) {
371- PGresult * res = PQgetResult (conn );
370+ if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK )
371+ {
372+ PGresult * res = PQgetResult (conn );
373+
372374 elog (WARNING , "Failed to send command %s" , sql );
373375 pgfdw_report_error (ERROR , res , conn , true, sql );
374376 PQclear (res );
@@ -378,8 +380,10 @@ do_sql_send_command(PGconn *conn, const char *sql)
378380static void
379381do_sql_wait_command (PGconn * conn , const char * sql )
380382{
381- PGresult * res ;
382- while ((res = PQgetResult (conn )) != NULL ) {
383+ PGresult * res ;
384+
385+ while ((res = PQgetResult (conn )) != NULL )
386+ {
383387 if (PQresultStatus (res ) != PGRES_COMMAND_OK )
384388 pgfdw_report_error (ERROR , res , conn , true, sql );
385389 PQclear (res );
@@ -410,9 +414,10 @@ begin_remote_xact(ConnCacheEntry *entry)
410414 elog (DEBUG3 , "starting remote transaction on connection %p" ,
411415 entry -> conn );
412416
413- if (TransactionIdIsValid (gxid )) {
414- char stmt [64 ];
415- PGresult * res ;
417+ if (TransactionIdIsValid (gxid ))
418+ {
419+ char stmt [64 ];
420+ PGresult * res ;
416421
417422 snprintf (stmt , sizeof (stmt ), "select public.dtm_join_transaction(%d)" , gxid );
418423 res = PQexec (entry -> conn , stmt );
@@ -425,14 +430,15 @@ begin_remote_xact(ConnCacheEntry *entry)
425430 sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ" ;
426431 do_sql_command (entry -> conn , sql );
427432 entry -> xact_depth = 1 ;
428- if (UseTsDtmTransactions )
433+ if (UseTsDtmTransactions )
429434 {
430435 if (!currentGlobalTransactionId )
431436 {
432- PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
433- MyProcPid , ++ currentLocalTransactionId ));
434- char * resp ;
435- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
437+ PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
438+ MyProcPid , ++ currentLocalTransactionId ));
439+ char * resp ;
440+
441+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
436442 {
437443 pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
438444 }
@@ -442,13 +448,16 @@ begin_remote_xact(ConnCacheEntry *entry)
442448 pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
443449 }
444450 PQclear (res );
445- } else {
446- PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
447- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
451+ }
452+ else
453+ {
454+ PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
455+
456+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
448457 {
449458 pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
450459 }
451- PQclear (res );
460+ PQclear (res );
452461 }
453462 }
454463 }
@@ -576,13 +585,14 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
576585 PQclear (res );
577586}
578587
579- typedef bool (* DtmCommandResultHandler )(PGresult * result , void * arg );
588+ typedef bool (* DtmCommandResultHandler ) (PGresult * result , void * arg );
580589
581- static bool RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
590+ static bool
591+ RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
582592{
583593 HASH_SEQ_STATUS scan ;
584594 ConnCacheEntry * entry ;
585- bool allOk = true;
595+ bool allOk = true;
586596
587597 hash_seq_init (& scan , ConnectionHash );
588598 while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
@@ -598,40 +608,47 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
598608 {
599609 if (entry -> xact_depth > 0 )
600610 {
601- PGresult * result = PQgetResult (entry -> conn );
611+ PGresult * result = PQgetResult (entry -> conn );
612+
602613 if (PQresultStatus (result ) != expectedStatus || (handler && !handler (result , arg )))
603614 {
604615 elog (WARNING , "Failed command %s: status=%d, expected status=%d" , sql , PQresultStatus (result ), expectedStatus );
605616 pgfdw_report_error (ERROR , result , entry -> conn , true, sql );
606617 allOk = false;
607618 }
608619 PQclear (result );
609- PQgetResult (entry -> conn ); /* consume NULL result */
620+ PQgetResult (entry -> conn ); /* consume NULL result */
610621 }
611622 }
612623 return allOk ;
613624}
614625
615- static bool RunDtmCommand (char const * sql )
626+ static bool
627+ RunDtmCommand (char const * sql )
616628{
617629 return RunDtmStatement (sql , PGRES_COMMAND_OK , NULL , NULL );
618630}
619631
620- static bool RunDtmFunction (char const * sql )
632+ static bool
633+ RunDtmFunction (char const * sql )
621634{
622635 return RunDtmStatement (sql , PGRES_TUPLES_OK , NULL , NULL );
623636}
624637
625638
626- static bool DtmMaxCSN (PGresult * result , void * arg )
639+ static bool
640+ DtmMaxCSN (PGresult * result , void * arg )
627641{
628- char * resp = PQgetvalue (result , 0 , 0 );
629- csn_t * maxCSN = (csn_t * )arg ;
630- csn_t csn = 0 ;
642+ char * resp = PQgetvalue (result , 0 , 0 );
643+ csn_t * maxCSN = (csn_t * ) arg ;
644+ csn_t csn = 0 ;
645+
631646 if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & csn ) != 1 )
632647 {
633648 return false;
634- } else {
649+ }
650+ else
651+ {
635652 if (* maxCSN < csn )
636653 {
637654 * maxCSN = csn ;
@@ -657,35 +674,36 @@ pgfdw_xact_callback(XactEvent event, void *arg)
657674 {
658675 switch (event )
659676 {
660- case XACT_EVENT_PARALLEL_PRE_COMMIT :
661- case XACT_EVENT_PRE_COMMIT :
662- {
663- csn_t maxCSN = 0 ;
664-
665- if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
666- MyProcPid , currentLocalTransactionId )) ||
667- !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
668- MyProcPid , currentLocalTransactionId )) ||
669- !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
670- MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
671- !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
672- MyProcPid , currentLocalTransactionId , maxCSN )) ||
673- !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
674- MyProcPid , currentLocalTransactionId )))
675- {
676- RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
677- MyProcPid , currentLocalTransactionId ));
678- ereport (ERROR ,
679- (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
680- errmsg ("transaction was aborted at one of the shards" )));
681- break ;
682- }
683- return ;
684- }
685- default :
686- break ;
677+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
678+ case XACT_EVENT_PRE_COMMIT :
679+ {
680+ csn_t maxCSN = 0 ;
681+
682+ if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
683+ MyProcPid , currentLocalTransactionId )) ||
684+ !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
685+ MyProcPid , currentLocalTransactionId )) ||
686+ !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
687+ MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
688+ !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
689+ MyProcPid , currentLocalTransactionId , maxCSN )) ||
690+ !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
691+ MyProcPid , currentLocalTransactionId )))
692+ {
693+ RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
694+ MyProcPid , currentLocalTransactionId ));
695+ ereport (ERROR ,
696+ (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
697+ errmsg ("transaction was aborted at one of the shards" )));
698+ break ;
699+ }
700+ return ;
701+ }
702+ default :
703+ break ;
687704 }
688705 }
706+
689707 /*
690708 * Scan all connection cache entries to find open remote transactions, and
691709 * close them.
@@ -694,27 +712,28 @@ pgfdw_xact_callback(XactEvent event, void *arg)
694712 while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
695713 {
696714 PGresult * res ;
697-
715+
698716 /* Ignore cache entry if no open connection right now */
699717 if (entry -> conn == NULL )
700718 continue ;
701-
719+
702720 /* If it has an open remote transaction, try to close it */
703721 if (entry -> xact_depth > 0 )
704722 {
705723 elog (DEBUG3 , "closing remote transaction on connection %p event %d" ,
706724 entry -> conn , event );
707-
725+
708726 switch (event )
709727 {
710- case XACT_EVENT_PARALLEL_PRE_COMMIT :
711- case XACT_EVENT_PRE_COMMIT :
712- /* Commit all remote transactions during pre-commit */
728+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
729+ case XACT_EVENT_PRE_COMMIT :
730+ /* Commit all remote transactions during pre-commit */
713731 do_sql_send_command (entry -> conn , "COMMIT TRANSACTION" );
714732 continue ;
715-
716- case XACT_EVENT_PRE_PREPARE :
717- /*
733+
734+ case XACT_EVENT_PRE_PREPARE :
735+
736+ /*
718737 * We disallow remote transactions that modified anything,
719738 * since it's not very reasonable to hold them open until
720739 * the prepared transaction is committed. For the moment,
@@ -723,18 +742,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
723742 * come right back here with event == XACT_EVENT_ABORT, so
724743 * we'll clean up the connection state at that point.
725744 */
726- ereport (ERROR ,
745+ ereport (ERROR ,
727746 (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
728747 errmsg ("cannot prepare a transaction that modified remote tables" )));
729748 break ;
730749
731- case XACT_EVENT_PARALLEL_COMMIT :
750+ case XACT_EVENT_PARALLEL_COMMIT :
732751 case XACT_EVENT_COMMIT :
733- case XACT_EVENT_PREPARE :
734- if (!currentGlobalTransactionId )
752+ case XACT_EVENT_PREPARE :
753+ if (!currentGlobalTransactionId )
735754 {
736755 do_sql_wait_command (entry -> conn , "COMMIT TRANSACTION" );
737756 }
757+
738758 /*
739759 * If there were any errors in subtransactions, and we
740760 * made prepared statements, do a DEALLOCATE ALL to make
@@ -758,11 +778,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
758778 entry -> have_prep_stmt = false;
759779 entry -> have_error = false;
760780 break ;
761-
762- case XACT_EVENT_PARALLEL_ABORT :
763- case XACT_EVENT_ABORT :
764- /* Assume we might have lost track of prepared statements */
765- entry -> have_error = true;
781+
782+ case XACT_EVENT_PARALLEL_ABORT :
783+ case XACT_EVENT_ABORT :
784+ /* Assume we might have lost track of prepared statements */
785+ entry -> have_error = true;
766786 /* If we're aborting, abort all remote transactions too */
767787 res = PQexec (entry -> conn , "ABORT TRANSACTION" );
768788 /* Note: can't throw ERROR, it would be infinite loop */
@@ -782,11 +802,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
782802 entry -> have_error = false;
783803 }
784804 break ;
785-
786- case XACT_EVENT_START :
787- case XACT_EVENT_ABORT_PREPARED :
788- case XACT_EVENT_COMMIT_PREPARED :
789- break ;
805+
806+ case XACT_EVENT_START :
807+ case XACT_EVENT_ABORT_PREPARED :
808+ case XACT_EVENT_COMMIT_PREPARED :
809+ break ;
790810 }
791811 }
792812 /* Reset state to show we're out of a transaction */
@@ -804,11 +824,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
804824 entry -> conn = NULL ;
805825 }
806826 }
807- if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT ) {
827+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT )
828+ {
808829 /*
809- * Regardless of the event type, we can now mark ourselves as out of the
810- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
811- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
830+ * Regardless of the event type, we can now mark ourselves as out of
831+ * the transaction. (Note: if we are here during PRE_COMMIT or
832+ * PRE_PREPARE, this saves a useless scan of the hashtable during
833+ * COMMIT or PREPARE.)
812834 */
813835 xact_got_connection = false;
814836
0 commit comments