1717#include "commands/extension.h"
1818#include "executor/executor.h"
1919#include "fmgr.h"
20+ #include "foreign/foreign.h"
2021#include "libpq/libpq.h"
2122#include "libpq-fe.h"
23+ #include "miscadmin.h"
2224#include "optimizer/planner.h"
25+ #include "pgstat.h"
26+ #include "postgres_fdw.h"
27+ #include "storage/latch.h"
2328#include "tcop/utility.h"
29+ #include "utils/builtins.h"
2430#include "utils/guc.h"
31+ #include "utils/memutils.h"
32+
2533
2634PG_MODULE_MAGIC ;
2735
@@ -33,8 +41,7 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
3341
3442static void HOOK_Utility_injection (PlannedStmt * pstmt , const char * queryString ,
3543 ProcessUtilityContext context , ParamListInfo params ,
36- QueryEnvironment * queryEnv , DestReceiver * dest ,
37- char * completionTag );
44+ DestReceiver * dest , char * completionTag );
3845static void HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags );
3946static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
4047
@@ -44,6 +51,10 @@ char *remote_server_fdwname;
4451static bool ExtensionIsActivated = false;
4552static PGconn * conn = NULL ;
4653
54+ static Oid serverid = InvalidOid ;
55+ static UserMapping * user = NULL ;
56+
57+
4758/*
4859 * Module load/unload callback
4960 */
@@ -80,79 +91,14 @@ ExtensionIsActive(void)
8091
8192 if (
8293 !IsTransactionState () ||
83- !OidIsValid (get_extension_oid ("repeater " , true))
94+ !OidIsValid (get_extension_oid ("pg_repeater " , true))
8495 )
8596 return false;
8697
8798 ExtensionIsActivated = true;
8899 return ExtensionIsActivated ;
89100}
90101
91- #include "miscadmin.h"
92- #include "pgstat.h"
93- #include "storage/latch.h"
94-
95- #include "foreign/foreign.h"
96- #include "postgres_fdw.h"
97-
98- static Oid serverid = InvalidOid ;
99- static UserMapping * user = NULL ;
100-
101- static bool
102- pgfdw_cancel_query (PGconn * conn )
103- {
104- PGcancel * cancel ;
105- char errbuf [256 ];
106- PGresult * result = NULL ;
107-
108- if ((cancel = PQgetCancel (conn )))
109- {
110- if (!PQcancel (cancel , errbuf , sizeof (errbuf )))
111- {
112- ereport (WARNING ,
113- (errcode (ERRCODE_CONNECTION_FAILURE ),
114- errmsg ("could not send cancel request: %s" ,
115- errbuf )));
116- PQfreeCancel (cancel );
117- return false;
118- }
119-
120- PQfreeCancel (cancel );
121- }
122- else
123- elog (FATAL , "Can't get connection cancel descriptor" );
124-
125- PQconsumeInput (conn );
126- PQclear (result );
127-
128- return true;
129- }
130-
131- static void
132- cancelQueryIfNeeded (PGconn * conn , const char * query )
133- {
134- Assert (conn != NULL );
135- Assert (query != NULL );
136-
137- if (PQtransactionStatus (conn ) != PQTRANS_IDLE )
138- {
139- PGresult * res ;
140-
141- printf ("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n" ,
142- PQstatus (conn ),
143- PQtransactionStatus (conn ),
144- PQerrorMessage (conn ));
145-
146- res = PQgetResult (conn );
147-
148- if (PQresultStatus (res ) == PGRES_FATAL_ERROR )
149- Assert (pgfdw_cancel_query (conn ));
150- else
151- pgfdw_get_result (conn , query );
152- }
153-
154- }
155-
156102/*
157103 * We need to send some DML queries for sync database schema to a plan execution
158104 * at a remote instance.
@@ -162,7 +108,6 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
162108 const char * queryString ,
163109 ProcessUtilityContext context ,
164110 ParamListInfo params ,
165- QueryEnvironment * queryEnv ,
166111 DestReceiver * dest ,
167112 char * completionTag )
168113{
@@ -192,6 +137,8 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
192137 case T_VacuumStmt :
193138 break ;
194139 default :
140+ {
141+ PGresult * res ;
195142 if (nodeTag (parsetree ) == T_TransactionStmt )
196143 {
197144 TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
@@ -202,26 +149,23 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
202149 )
203150 break ;
204151 }
205- if (conn )
206- cancelQueryIfNeeded (conn , queryString );
207152 conn = GetConnection (user , true);
208- cancelQueryIfNeeded (conn , queryString );
209153 Assert (conn != NULL );
210154
211- Assert (PQsendQuery (conn , queryString ));
155+ res = PQexec (conn , queryString );
156+ PQclear (res );
157+ }
212158 break ;
213- };
159+ }
214160 }
215161
216162 if (next_ProcessUtility_hook )
217163 (* next_ProcessUtility_hook ) (pstmt , queryString , context , params ,
218- queryEnv , dest , completionTag );
164+ dest , completionTag );
219165 else
220166 standard_ProcessUtility (pstmt , queryString ,
221- context , params , queryEnv ,
167+ context , params ,
222168 dest , completionTag );
223- if (conn )
224- cancelQueryIfNeeded (conn , queryString );
225169}
226170
227171static void
@@ -245,25 +189,47 @@ HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
245189 {
246190 Oid serverid ;
247191 UserMapping * user ;
192+ char * query ,
193+ * query_container ,
194+ * plan ,
195+ * plan_container ;
196+ int qlen , qlen1 ,
197+ plen , plen1 ;
198+ PGresult * res ;
248199
249200 serverid = get_foreign_server_oid (remote_server_fdwname , true);
250201 Assert (OidIsValid (serverid ));
251202
252203 user = GetUserMapping (GetUserId (), serverid );
253204 conn = GetConnection (user , true);
254- cancelQueryIfNeeded (conn , queryDesc -> sourceText );
255205
256- if (PQsendPlan (conn , serialize_plan (queryDesc , eflags )) == 0 )
257- pgfdw_report_error (ERROR , NULL , conn , false, queryDesc -> sourceText );
206+ set_portable_output (true);
207+ plan = nodeToString (queryDesc -> plannedstmt );
208+ set_portable_output (false);
209+ plen = b64_enc_len (plan , strlen (plan ) + 1 );
210+ plan_container = (char * ) palloc0 (plen + 1 );
211+ plen1 = b64_encode (plan , strlen (plan ), plan_container );
212+ Assert (plen > plen1 );
213+
214+ qlen = b64_enc_len (queryDesc -> sourceText , strlen (queryDesc -> sourceText ) + 1 );
215+ query_container = (char * ) palloc0 (qlen + 1 );
216+ qlen1 = b64_encode (queryDesc -> sourceText , strlen (queryDesc -> sourceText ), query_container );
217+ Assert (qlen > qlen1 );
218+
219+ query = palloc0 (qlen + plen + 100 );
220+ sprintf (query , "SELECT public.pg_exec_plan('%s', '%s');" , query_container , plan_container );
221+
222+ res = PQexec (conn , query );
223+ PQclear (res );
224+ pfree (query );
225+ pfree (query_container );
226+ pfree (plan_container );
258227 }
259228}
260229
261230static void
262231HOOK_ExecEnd_injection (QueryDesc * queryDesc )
263232{
264- if (conn )
265- cancelQueryIfNeeded (conn , queryDesc -> sourceText );
266-
267233 if (prev_ExecutorEnd )
268234 prev_ExecutorEnd (queryDesc );
269235 else
0 commit comments