1717#include "access/htup_details.h"
1818#include "catalog/pg_user_mapping.h"
1919#include "access/global_snapshot.h"
20+ #include "access/hash.h"
2021#include "access/xact.h"
2122#include "access/xtm.h"
2223#include "access/transam.h"
3334#include "utils/memutils.h"
3435#include "utils/syscache.h"
3536
36- /*
37- * Connection cache hash table entry
38- *
39- * The lookup key in this hash table is the user mapping OID. We use just one
40- * connection per user mapping ID, which ensures that all the scans use the
41- * same snapshot during a query. Using the user mapping OID rather than
42- * the foreign server OID + user OID avoids creating multiple connections when
43- * the public user mapping applies to all user OIDs.
44- *
45- * The "conn" pointer can be NULL if we don't currently have a live connection.
46- * When we do have a connection, xact_depth tracks the current depth of
47- * transactions and subtransactions open on the remote side. We need to issue
48- * commands at the same nesting depth on the remote as we're executing at
49- * ourselves, so that rolling back a subtransaction will kill the right
50- * queries and not the wrong ones.
51- */
52- typedef Oid ConnCacheKey ;
53-
54- struct ConnCacheEntry
55- {
56- ConnCacheKey key ; /* hash key (must be first) */
57- PGconn * conn ; /* connection to foreign server, or NULL */
58- WaitEventSet * wait_set ; /* for data from server ready notifications */
59- /* Remaining fields are invalid when conn is NULL: */
60- int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
61- * one level of subxact open, etc */
62- bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
63- bool have_error ; /* have any subxacts aborted in this xact? */
64- bool changing_xact_state ; /* xact state change in process */
65- bool invalidated ; /* true if reconnect is pending */
66- uint32 server_hashvalue ; /* hash value of foreign server OID */
67- uint32 mapping_hashvalue ; /* hash value of user mapping OID */
68- bool copy_from_started ; /* COPY FROM in progress on this conn */
69- };
70-
7137/*
7238 * Connection cache (initialized on first use)
7339 */
@@ -117,6 +83,35 @@ static bool pgfdw_exec_cleanup_query(ConnCacheEntry *entry, const char *query,
11783 bool ignore_errors );
11884static bool pgfdw_get_cleanup_result (ConnCacheEntry * entry , TimestampTz endtime ,
11985 PGresult * * result );
86+ static void cleanup_dm_prepared (ConnCacheEntry * entry );
87+
88+ /* Adapted from string_hash */
89+ static uint32
90+ char_ptr_hash_fn (const void * key , Size keysize )
91+ {
92+ char * const * keyptr = key ;
93+ return DatumGetUInt32 (hash_any ((const unsigned char * ) (* keyptr ), strlen (* keyptr )));
94+ }
95+
96+ static int
97+ char_ptr_match_fn (const void * key1 , const void * key2 , Size keysize )
98+ {
99+ char * const * keyptr1 = key1 ;
100+ char * const * keyptr2 = key2 ;
101+ return strcmp (* keyptr1 , * keyptr2 );
102+ }
103+
104+ /* Allocate always from top-level, where hashtable lives */
105+ static void *
106+ char_ptr_keycopy_fn (void * dest , const void * src , Size keysize )
107+ {
108+ char * * destptr = dest ;
109+ char * const * srcptr = src ;
110+
111+ * destptr = MemoryContextStrdup (CacheMemoryContext , * srcptr );
112+ return NULL ; /* not used */
113+ }
114+
120115/*
121116 * Get a ConnCacheEntry which can be used to execute queries on the remote PostgreSQL
122117 * server with the user's authorization. A new connection is established
@@ -208,6 +203,7 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
208203 if (entry -> conn == NULL )
209204 {
210205 ForeignServer * server = GetForeignServer (user -> serverid );
206+ HASHCTL ctl ;
211207
212208 /* Reset all transient state fields, to be sure all are clean */
213209 entry -> xact_depth = 0 ;
@@ -226,6 +222,19 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
226222 /* Now try to make the connection */
227223 connect_pg_server (entry , server , user );
228224
225+ /* Create hash table of prepared statemetns for DirectModify */
226+ ctl .keysize = sizeof (char * );
227+ ctl .entrysize = sizeof (DirectModifyPrepStmtHashEnt );
228+ ctl .hash = char_ptr_hash_fn ;
229+ ctl .match = char_ptr_match_fn ;
230+ ctl .keycopy = char_ptr_keycopy_fn ;
231+ ctl .hcxt = CacheMemoryContext ;
232+
233+ entry -> dm_prepared = hash_create ("DirectModify prepared stmts" ,
234+ 16 , & ctl ,
235+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE |
236+ HASH_KEYCOPY | HASH_CONTEXT );
237+
229238 elog (DEBUG3 , "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)" ,
230239 entry -> conn , server -> servername , user -> umid , user -> userid );
231240 }
@@ -364,6 +373,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
364373{
365374 if (entry -> conn != NULL )
366375 {
376+ cleanup_dm_prepared (entry );
377+ hash_destroy (entry -> dm_prepared );
367378 Assert (entry -> wait_set );
368379 FreeWaitEventSet (entry -> wait_set );
369380 entry -> wait_set = NULL ;
@@ -1021,6 +1032,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10211032 entry -> have_error = false;
10221033 }
10231034
1035+ /* We have deallocated all prepared statements */
1036+ cleanup_dm_prepared (entry );
1037+
10241038 /* Disarm changing_xact_state if it all worked. */
10251039 entry -> changing_xact_state = abort_cleanup_failure ;
10261040 break ;
@@ -1118,11 +1132,27 @@ deallocate_prepared_stmts(ConnCacheEntry *entry)
11181132 {
11191133 res = PQexec (entry -> conn , "DEALLOCATE ALL" );
11201134 PQclear (res );
1135+ cleanup_dm_prepared (entry );
11211136 }
11221137 entry -> have_prep_stmt = false;
11231138 entry -> have_error = false;
11241139}
11251140
1141+ static void cleanup_dm_prepared (ConnCacheEntry * entry )
1142+ {
1143+ HASH_SEQ_STATUS scan ;
1144+ DirectModifyPrepStmtHashEnt * prep_stmt_entry ;
1145+
1146+ hash_seq_init (& scan , entry -> dm_prepared );
1147+ while ((prep_stmt_entry = (DirectModifyPrepStmtHashEnt * ) hash_seq_search (& scan )))
1148+ {
1149+ /* save the key to free it */
1150+ char * sql = prep_stmt_entry -> sql ;
1151+ hash_search (entry -> dm_prepared , & prep_stmt_entry -> sql , HASH_REMOVE , NULL );
1152+ pfree (sql );
1153+ }
1154+ }
1155+
11261156/*
11271157 * pgfdw_subxact_callback --- cleanup at subtransaction end.
11281158 */
0 commit comments