2121#include "miscadmin.h"
2222#include "storage/latch.h"
2323#include "utils/hsearch.h"
24+ #include "utils/inval.h"
2425#include "utils/memutils.h"
2526#include "utils/syscache.h"
2627
@@ -47,11 +48,15 @@ typedef struct ConnCacheEntry
4748{
4849 ConnCacheKey key ; /* hash key (must be first) */
4950 PGconn * conn ; /* connection to foreign server, or NULL */
51+ /* Remaining fields are invalid when conn is NULL: */
5052 int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
5153 * one level of subxact open, etc */
5254 bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
5355 bool have_error ; /* have any subxacts aborted in this xact? */
5456 bool changing_xact_state ; /* xact state change in process */
57+ bool invalidated ; /* true if reconnect is pending */
58+ uint32 server_hashvalue ; /* hash value of foreign server OID */
59+ uint32 mapping_hashvalue ; /* hash value of user mapping OID */
5560} ConnCacheEntry ;
5661
5762/*
@@ -68,6 +73,7 @@ static bool xact_got_connection = false;
6873
6974/* prototypes of private functions */
7075static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
76+ static void disconnect_pg_server (ConnCacheEntry * entry );
7177static void check_conn_params (const char * * keywords , const char * * values );
7278static void configure_remote_session (PGconn * conn );
7379static void do_sql_command (PGconn * conn , const char * sql );
@@ -77,6 +83,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
7783 SubTransactionId mySubid ,
7884 SubTransactionId parentSubid ,
7985 void * arg );
86+ static void pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue );
8087static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry * entry );
8188static bool pgfdw_cancel_query (PGconn * conn );
8289static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
@@ -94,13 +101,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
94101 * will_prep_stmt must be true if caller intends to create any prepared
95102 * statements. Since those don't go away automatically at transaction end
96103 * (not even on error), we need this flag to cue manual cleanup.
97- *
98- * XXX Note that caching connections theoretically requires a mechanism to
99- * detect change of FDW objects to invalidate already established connections.
100- * We could manage that by watching for invalidation events on the relevant
101- * syscaches. For the moment, though, it's not clear that this would really
102- * be useful and not mere pedantry. We could not flush any active connections
103- * mid-transaction anyway.
104104 */
105105PGconn *
106106GetConnection (UserMapping * user , bool will_prep_stmt )
@@ -129,6 +129,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
129129 */
130130 RegisterXactCallback (pgfdw_xact_callback , NULL );
131131 RegisterSubXactCallback (pgfdw_subxact_callback , NULL );
132+ CacheRegisterSyscacheCallback (FOREIGNSERVEROID ,
133+ pgfdw_inval_callback , (Datum ) 0 );
134+ CacheRegisterSyscacheCallback (USERMAPPINGOID ,
135+ pgfdw_inval_callback , (Datum ) 0 );
132136 }
133137
134138 /* Set flag that we did GetConnection during the current transaction */
@@ -143,17 +147,27 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
143147 entry = hash_search (ConnectionHash , & key , HASH_ENTER , & found );
144148 if (!found )
145149 {
146- /* initialize new hashtable entry (key is already filled in) */
150+ /*
151+ * We need only clear "conn" here; remaining fields will be filled
152+ * later when "conn" is set.
153+ */
147154 entry -> conn = NULL ;
148- entry -> xact_depth = 0 ;
149- entry -> have_prep_stmt = false;
150- entry -> have_error = false;
151- entry -> changing_xact_state = false;
152155 }
153156
154157 /* Reject further use of connections which failed abort cleanup. */
155158 pgfdw_reject_incomplete_xact_state_change (entry );
156159
160+ /*
161+ * If the connection needs to be remade due to invalidation, disconnect as
162+ * soon as we're out of all transactions.
163+ */
164+ if (entry -> conn != NULL && entry -> invalidated && entry -> xact_depth == 0 )
165+ {
166+ elog (DEBUG3 , "closing connection %p for option changes to take effect" ,
167+ entry -> conn );
168+ disconnect_pg_server (entry );
169+ }
170+
157171 /*
158172 * We don't check the health of cached connection here, because it would
159173 * require some overhead. Broken connection will be detected when the
@@ -163,15 +177,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
163177 /*
164178 * If cache entry doesn't have a connection, we have to establish a new
165179 * connection. (If connect_pg_server throws an error, the cache entry
166- * will be left in a valid empty state.)
180+ * will remain in a valid empty state, ie conn == NULL .)
167181 */
168182 if (entry -> conn == NULL )
169183 {
170184 ForeignServer * server = GetForeignServer (user -> serverid );
171185
172- entry -> xact_depth = 0 ; /* just to be sure */
186+ /* Reset all transient state fields, to be sure all are clean */
187+ entry -> xact_depth = 0 ;
173188 entry -> have_prep_stmt = false;
174189 entry -> have_error = false;
190+ entry -> changing_xact_state = false;
191+ entry -> invalidated = false;
192+ entry -> server_hashvalue =
193+ GetSysCacheHashValue1 (FOREIGNSERVEROID ,
194+ ObjectIdGetDatum (server -> serverid ));
195+ entry -> mapping_hashvalue =
196+ GetSysCacheHashValue1 (USERMAPPINGOID ,
197+ ObjectIdGetDatum (user -> umid ));
198+
199+ /* Now try to make the connection */
175200 entry -> conn = connect_pg_server (server , user );
176201
177202 elog (DEBUG3 , "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)" ,
@@ -285,6 +310,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
285310 return conn ;
286311}
287312
313+ /*
314+ * Disconnect any open connection for a connection cache entry.
315+ */
316+ static void
317+ disconnect_pg_server (ConnCacheEntry * entry )
318+ {
319+ if (entry -> conn != NULL )
320+ {
321+ PQfinish (entry -> conn );
322+ entry -> conn = NULL ;
323+ }
324+ }
325+
288326/*
289327 * For non-superusers, insist that the connstr specify a password. This
290328 * prevents a password from being picked up from .pgpass, a service file,
@@ -786,9 +824,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
786824 entry -> changing_xact_state )
787825 {
788826 elog (DEBUG3 , "discarding connection %p" , entry -> conn );
789- PQfinish (entry -> conn );
790- entry -> conn = NULL ;
791- entry -> changing_xact_state = false;
827+ disconnect_pg_server (entry );
792828 }
793829 }
794830
@@ -905,6 +941,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
905941 }
906942}
907943
944+ /*
945+ * Connection invalidation callback function
946+ *
947+ * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
948+ * mark connections depending on that entry as needing to be remade.
949+ * We can't immediately destroy them, since they might be in the midst of
950+ * a transaction, but we'll remake them at the next opportunity.
951+ *
952+ * Although most cache invalidation callbacks blow away all the related stuff
953+ * regardless of the given hashvalue, connections are expensive enough that
954+ * it's worth trying to avoid that.
955+ *
956+ * NB: We could avoid unnecessary disconnection more strictly by examining
957+ * individual option values, but it seems too much effort for the gain.
958+ */
959+ static void
960+ pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue )
961+ {
962+ HASH_SEQ_STATUS scan ;
963+ ConnCacheEntry * entry ;
964+
965+ Assert (cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID );
966+
967+ /* ConnectionHash must exist already, if we're registered */
968+ hash_seq_init (& scan , ConnectionHash );
969+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
970+ {
971+ /* Ignore invalid entries */
972+ if (entry -> conn == NULL )
973+ continue ;
974+
975+ /* hashvalue == 0 means a cache reset, must clear all state */
976+ if (hashvalue == 0 ||
977+ (cacheid == FOREIGNSERVEROID &&
978+ entry -> server_hashvalue == hashvalue ) ||
979+ (cacheid == USERMAPPINGOID &&
980+ entry -> mapping_hashvalue == hashvalue ))
981+ entry -> invalidated = true;
982+ }
983+ }
984+
908985/*
909986 * Raise an error if the given connection cache entry is marked as being
910987 * in the middle of an xact state change. This should be called at which no
@@ -922,9 +999,14 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
922999 Form_pg_user_mapping umform ;
9231000 ForeignServer * server ;
9241001
925- if (!entry -> changing_xact_state )
1002+ /* nothing to do for inactive entries and entries of sane state */
1003+ if (entry -> conn == NULL || !entry -> changing_xact_state )
9261004 return ;
9271005
1006+ /* make sure this entry is inactive */
1007+ disconnect_pg_server (entry );
1008+
1009+ /* find server name to be shown in the message below */
9281010 tup = SearchSysCache1 (USERMAPPINGOID ,
9291011 ObjectIdGetDatum (entry -> key ));
9301012 if (!HeapTupleIsValid (tup ))
0 commit comments