@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
5858 bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
5959 bool have_error ; /* have any subxacts aborted in this xact? */
6060 bool changing_xact_state ; /* xact state change in process */
61+ bool parallel_commit ; /* do we commit (sub)xacts in parallel? */
6162 bool invalidated ; /* true if reconnect is pending */
6263 bool keep_connections ; /* setting value of keep_connections
6364 * server option */
@@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
9293static void disconnect_pg_server (ConnCacheEntry * entry );
9394static void check_conn_params (const char * * keywords , const char * * values , UserMapping * user );
9495static void configure_remote_session (PGconn * conn );
96+ static void do_sql_command_begin (PGconn * conn , const char * sql );
97+ static void do_sql_command_end (PGconn * conn , const char * sql ,
98+ bool consume_input );
9599static void begin_remote_xact (ConnCacheEntry * entry );
96100static void pgfdw_xact_callback (XactEvent event , void * arg );
97101static void pgfdw_subxact_callback (SubXactEvent event ,
@@ -100,13 +104,17 @@ static void pgfdw_subxact_callback(SubXactEvent event,
100104 void * arg );
101105static void pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue );
102106static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry * entry );
107+ static void pgfdw_reset_xact_state (ConnCacheEntry * entry , bool toplevel );
103108static bool pgfdw_cancel_query (PGconn * conn );
104109static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
105110 bool ignore_errors );
106111static bool pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
107112 PGresult * * result , bool * timed_out );
108113static void pgfdw_abort_cleanup (ConnCacheEntry * entry , const char * sql ,
109114 bool toplevel );
115+ static void pgfdw_finish_pre_commit_cleanup (List * pending_entries );
116+ static void pgfdw_finish_pre_subcommit_cleanup (List * pending_entries ,
117+ int curlevel );
110118static bool UserMappingPasswordRequired (UserMapping * user );
111119static bool disconnect_cached_connections (Oid serverid );
112120
@@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
316324 * is changed will be closed and re-made later.
317325 *
318326 * By default, all the connections to any foreign servers are kept open.
327+ *
328+ * Also determine whether to commit (sub)transactions opened on the remote
329+ * server in parallel at (sub)transaction end.
319330 */
320331 entry -> keep_connections = true;
332+ entry -> parallel_commit = false;
321333 foreach (lc , server -> options )
322334 {
323335 DefElem * def = (DefElem * ) lfirst (lc );
324336
325337 if (strcmp (def -> defname , "keep_connections" ) == 0 )
326338 entry -> keep_connections = defGetBoolean (def );
339+ else if (strcmp (def -> defname , "parallel_commit" ) == 0 )
340+ entry -> parallel_commit = defGetBoolean (def );
327341 }
328342
329343 /* Now try to make the connection */
@@ -623,10 +637,30 @@ configure_remote_session(PGconn *conn)
623637void
624638do_sql_command (PGconn * conn , const char * sql )
625639{
626- PGresult * res ;
640+ do_sql_command_begin (conn , sql );
641+ do_sql_command_end (conn , sql , false);
642+ }
627643
644+ static void
645+ do_sql_command_begin (PGconn * conn , const char * sql )
646+ {
628647 if (!PQsendQuery (conn , sql ))
629648 pgfdw_report_error (ERROR , NULL , conn , false, sql );
649+ }
650+
651+ static void
652+ do_sql_command_end (PGconn * conn , const char * sql , bool consume_input )
653+ {
654+ PGresult * res ;
655+
656+ /*
657+ * If requested, consume whatever data is available from the socket.
658+ * (Note that if all data is available, this allows pgfdw_get_result to
659+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
660+ * which would be large compared to the overhead of PQconsumeInput.)
661+ */
662+ if (consume_input && !PQconsumeInput (conn ))
663+ pgfdw_report_error (ERROR , NULL , conn , false, sql );
630664 res = pgfdw_get_result (conn , sql );
631665 if (PQresultStatus (res ) != PGRES_COMMAND_OK )
632666 pgfdw_report_error (ERROR , res , conn , true, sql );
@@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
888922{
889923 HASH_SEQ_STATUS scan ;
890924 ConnCacheEntry * entry ;
925+ List * pending_entries = NIL ;
891926
892927 /* Quick exit if no connections were touched in this transaction. */
893928 if (!xact_got_connection )
@@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
925960
926961 /* Commit all remote transactions during pre-commit */
927962 entry -> changing_xact_state = true;
963+ if (entry -> parallel_commit )
964+ {
965+ do_sql_command_begin (entry -> conn , "COMMIT TRANSACTION" );
966+ pending_entries = lappend (pending_entries , entry );
967+ continue ;
968+ }
928969 do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
929970 entry -> changing_xact_state = false;
930971
@@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9811022 }
9821023
9831024 /* Reset state to show we're out of a transaction */
984- entry -> xact_depth = 0 ;
1025+ pgfdw_reset_xact_state (entry , true);
1026+ }
9851027
986- /*
987- * If the connection isn't in a good idle state, it is marked as
988- * invalid or keep_connections option of its server is disabled, then
989- * discard it to recover. Next GetConnection will open a new
990- * connection.
991- */
992- if (PQstatus (entry -> conn ) != CONNECTION_OK ||
993- PQtransactionStatus (entry -> conn ) != PQTRANS_IDLE ||
994- entry -> changing_xact_state ||
995- entry -> invalidated ||
996- !entry -> keep_connections )
997- {
998- elog (DEBUG3 , "discarding connection %p" , entry -> conn );
999- disconnect_pg_server (entry );
1000- }
1028+ /* If there are any pending connections, finish cleaning them up */
1029+ if (pending_entries )
1030+ {
1031+ Assert (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1032+ event == XACT_EVENT_PRE_COMMIT );
1033+ pgfdw_finish_pre_commit_cleanup (pending_entries );
10011034 }
10021035
10031036 /*
@@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10211054 HASH_SEQ_STATUS scan ;
10221055 ConnCacheEntry * entry ;
10231056 int curlevel ;
1057+ List * pending_entries = NIL ;
10241058
10251059 /* Nothing to do at subxact start, nor after commit. */
10261060 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10631097 /* Commit all remote subtransactions during pre-commit */
10641098 snprintf (sql , sizeof (sql ), "RELEASE SAVEPOINT s%d" , curlevel );
10651099 entry -> changing_xact_state = true;
1100+ if (entry -> parallel_commit )
1101+ {
1102+ do_sql_command_begin (entry -> conn , sql );
1103+ pending_entries = lappend (pending_entries , entry );
1104+ continue ;
1105+ }
10661106 do_sql_command (entry -> conn , sql );
10671107 entry -> changing_xact_state = false;
10681108 }
@@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10761116 }
10771117
10781118 /* OK, we're outta that level of subtransaction */
1079- entry -> xact_depth -- ;
1119+ pgfdw_reset_xact_state (entry , false);
1120+ }
1121+
1122+ /* If there are any pending connections, finish cleaning them up */
1123+ if (pending_entries )
1124+ {
1125+ Assert (event == SUBXACT_EVENT_PRE_COMMIT_SUB );
1126+ pgfdw_finish_pre_subcommit_cleanup (pending_entries , curlevel );
10801127 }
10811128}
10821129
@@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
11691216 server -> servername )));
11701217}
11711218
1219+ /*
1220+ * Reset state to show we're out of a (sub)transaction.
1221+ */
1222+ static void
1223+ pgfdw_reset_xact_state (ConnCacheEntry * entry , bool toplevel )
1224+ {
1225+ if (toplevel )
1226+ {
1227+ /* Reset state to show we're out of a transaction */
1228+ entry -> xact_depth = 0 ;
1229+
1230+ /*
1231+ * If the connection isn't in a good idle state, it is marked as
1232+ * invalid or keep_connections option of its server is disabled, then
1233+ * discard it to recover. Next GetConnection will open a new
1234+ * connection.
1235+ */
1236+ if (PQstatus (entry -> conn ) != CONNECTION_OK ||
1237+ PQtransactionStatus (entry -> conn ) != PQTRANS_IDLE ||
1238+ entry -> changing_xact_state ||
1239+ entry -> invalidated ||
1240+ !entry -> keep_connections )
1241+ {
1242+ elog (DEBUG3 , "discarding connection %p" , entry -> conn );
1243+ disconnect_pg_server (entry );
1244+ }
1245+ }
1246+ else
1247+ {
1248+ /* Reset state to show we're out of a subtransaction */
1249+ entry -> xact_depth -- ;
1250+ }
1251+ }
1252+
11721253/*
11731254 * Cancel the currently-in-progress query (whose query text we do not have)
11741255 * and ignore the result. Returns true if we successfully cancel the query
@@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
14561537 entry -> changing_xact_state = false;
14571538}
14581539
1540+ /*
1541+ * Finish pre-commit cleanup of connections on each of which we've sent a
1542+ * COMMIT command to the remote server.
1543+ */
1544+ static void
1545+ pgfdw_finish_pre_commit_cleanup (List * pending_entries )
1546+ {
1547+ ConnCacheEntry * entry ;
1548+ List * pending_deallocs = NIL ;
1549+ ListCell * lc ;
1550+
1551+ Assert (pending_entries );
1552+
1553+ /*
1554+ * Get the result of the COMMIT command for each of the pending entries
1555+ */
1556+ foreach (lc , pending_entries )
1557+ {
1558+ entry = (ConnCacheEntry * ) lfirst (lc );
1559+
1560+ Assert (entry -> changing_xact_state );
1561+ /*
1562+ * We might already have received the result on the socket, so pass
1563+ * consume_input=true to try to consume it first
1564+ */
1565+ do_sql_command_end (entry -> conn , "COMMIT TRANSACTION" , true);
1566+ entry -> changing_xact_state = false;
1567+
1568+ /* Do a DEALLOCATE ALL in parallel if needed */
1569+ if (entry -> have_prep_stmt && entry -> have_error )
1570+ {
1571+ /* Ignore errors (see notes in pgfdw_xact_callback) */
1572+ if (PQsendQuery (entry -> conn , "DEALLOCATE ALL" ))
1573+ {
1574+ pending_deallocs = lappend (pending_deallocs , entry );
1575+ continue ;
1576+ }
1577+ }
1578+ entry -> have_prep_stmt = false;
1579+ entry -> have_error = false;
1580+
1581+ pgfdw_reset_xact_state (entry , true);
1582+ }
1583+
1584+ /* No further work if no pending entries */
1585+ if (!pending_deallocs )
1586+ return ;
1587+
1588+ /*
1589+ * Get the result of the DEALLOCATE command for each of the pending
1590+ * entries
1591+ */
1592+ foreach (lc , pending_deallocs )
1593+ {
1594+ PGresult * res ;
1595+
1596+ entry = (ConnCacheEntry * ) lfirst (lc );
1597+
1598+ /* Ignore errors (see notes in pgfdw_xact_callback) */
1599+ while ((res = PQgetResult (entry -> conn )) != NULL )
1600+ {
1601+ PQclear (res );
1602+ /* Stop if the connection is lost (else we'll loop infinitely) */
1603+ if (PQstatus (entry -> conn ) == CONNECTION_BAD )
1604+ break ;
1605+ }
1606+ entry -> have_prep_stmt = false;
1607+ entry -> have_error = false;
1608+
1609+ pgfdw_reset_xact_state (entry , true);
1610+ }
1611+ }
1612+
1613+ /*
1614+ * Finish pre-subcommit cleanup of connections on each of which we've sent a
1615+ * RELEASE command to the remote server.
1616+ */
1617+ static void
1618+ pgfdw_finish_pre_subcommit_cleanup (List * pending_entries , int curlevel )
1619+ {
1620+ ConnCacheEntry * entry ;
1621+ char sql [100 ];
1622+ ListCell * lc ;
1623+
1624+ Assert (pending_entries );
1625+
1626+ /*
1627+ * Get the result of the RELEASE command for each of the pending entries
1628+ */
1629+ snprintf (sql , sizeof (sql ), "RELEASE SAVEPOINT s%d" , curlevel );
1630+ foreach (lc , pending_entries )
1631+ {
1632+ entry = (ConnCacheEntry * ) lfirst (lc );
1633+
1634+ Assert (entry -> changing_xact_state );
1635+ /*
1636+ * We might already have received the result on the socket, so pass
1637+ * consume_input=true to try to consume it first
1638+ */
1639+ do_sql_command_end (entry -> conn , sql , true);
1640+ entry -> changing_xact_state = false;
1641+
1642+ pgfdw_reset_xact_state (entry , false);
1643+ }
1644+ }
1645+
14591646/*
14601647 * List active foreign server connections.
14611648 *
0 commit comments