11/*-------------------------------------------------------------------------
22 *
33 * repeater.c
4- * Simple demo for remote plan execution patch.
4+ * Simple demo for remote plan execution patch.
55 *
66 * Transfer query plan to a remote instance and wait for result.
77 * Remote instance parameters (host, port) defines by GUCs.
3434
3535PG_MODULE_MAGIC ;
3636
37- void _PG_init (void );
37+ void _PG_init (void );
3838
39- static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
40- static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
41- static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
39+ static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
40+ static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
41+ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
4242
4343static void HOOK_Utility_injection (PlannedStmt * pstmt , const char * queryString ,
44- ProcessUtilityContext context , ParamListInfo params ,
45- DestReceiver * dest , char * completionTag );
44+ ProcessUtilityContext context , ParamListInfo params ,
45+ DestReceiver * dest , char * completionTag );
4646static void HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags );
4747static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
4848
4949/* Remote instance parameters. */
50- char * remote_server_fdwname ;
50+ char * remote_server_fdwname ;
5151
52- static bool ExtensionIsActivated = false;
53- static PGconn * conn = NULL ;
52+ static bool ExtensionIsActivated = false;
53+ static PGconn * conn = NULL ;
5454
55- static Oid serverid = InvalidOid ;
56- static UserMapping * user = NULL ;
55+ static Oid serverid = InvalidOid ;
56+ static UserMapping * user = NULL ;
5757
5858
5959/*
6363_PG_init (void )
6464{
6565 DefineCustomStringVariable ("repeater.fdwname" ,
66- "Remote host fdw name" ,
67- NULL ,
68- & remote_server_fdwname ,
69- "remoteserv" ,
70- PGC_SIGHUP ,
71- GUC_NOT_IN_SAMPLE ,
72- NULL ,
73- NULL ,
74- NULL );
66+ "Remote host fdw name" ,
67+ NULL ,
68+ & remote_server_fdwname ,
69+ "remoteserv" ,
70+ PGC_SIGHUP ,
71+ GUC_NOT_IN_SAMPLE ,
72+ NULL ,
73+ NULL ,
74+ NULL );
7575
7676 /* ProcessUtility hook */
7777 next_ProcessUtility_hook = ProcessUtility_hook ;
@@ -106,22 +106,22 @@ ExtensionIsActive(void)
106106 */
107107static void
108108HOOK_Utility_injection (PlannedStmt * pstmt ,
109- const char * queryString ,
110- ProcessUtilityContext context ,
111- ParamListInfo params ,
112- DestReceiver * dest ,
113- char * completionTag )
109+ const char * queryString ,
110+ ProcessUtilityContext context ,
111+ ParamListInfo params ,
112+ DestReceiver * dest ,
113+ char * completionTag )
114114{
115- Node * parsetree = pstmt -> utilityStmt ;
115+ Node * parsetree = pstmt -> utilityStmt ;
116116
117117 if (ExtensionIsActive () &&
118118 pstmt -> canSetTag &&
119119 (context != PROCESS_UTILITY_SUBCOMMAND )
120- )
120+ )
121121 {
122122 if (!user )
123123 {
124- MemoryContext oldCxt = MemoryContextSwitchTo (TopMemoryContext );
124+ MemoryContext oldCxt = MemoryContextSwitchTo (TopMemoryContext );
125125
126126 serverid = get_foreign_server_oid (remote_server_fdwname , true);
127127 Assert (OidIsValid (serverid ));
@@ -131,32 +131,33 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
131131 }
132132 switch (nodeTag (parsetree ))
133133 {
134- case T_CopyStmt :
135- case T_CreateExtensionStmt :
136- case T_ExplainStmt :
137- case T_FetchStmt :
138- case T_VacuumStmt :
139- break ;
140- default :
141- {
142- PGresult * res ;
143- if (nodeTag (parsetree ) == T_TransactionStmt )
144- {
145- TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
146-
147- if (
148- // (stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
149- (stmt -> kind != TRANS_STMT_SAVEPOINT )
150- )
151- break ;
152- }
153- conn = GetConnection (user , true);
154- Assert (conn != NULL );
155-
156- res = PQexec (conn , queryString );
157- PQclear (res );
158- }
159- break ;
134+ case T_CopyStmt :
135+ case T_CreateExtensionStmt :
136+ case T_ExplainStmt :
137+ case T_FetchStmt :
138+ case T_VacuumStmt :
139+ break ;
140+ default :
141+ {
142+ PGresult * res ;
143+
144+ if (nodeTag (parsetree ) == T_TransactionStmt )
145+ {
146+ TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
147+
148+ if (
149+ /* (stmt->kind != TRANS_STMT_ROLLBACK_TO) && */
150+ (stmt -> kind != TRANS_STMT_SAVEPOINT )
151+ )
152+ break ;
153+ }
154+ conn = GetConnection (user , true);
155+ Assert (conn != NULL );
156+
157+ res = PQexec (conn , queryString );
158+ PQclear (res );
159+ }
160+ break ;
160161 }
161162 }
162163
@@ -165,68 +166,70 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
165166 dest , completionTag );
166167 else
167168 standard_ProcessUtility (pstmt , queryString ,
168- context , params ,
169- dest , completionTag );
169+ context , params ,
170+ dest , completionTag );
170171}
171172
172173static void
173174HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags )
174175{
175- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
176+ Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
176177
177178 if (prev_ExecutorStart )
178179 prev_ExecutorStart (queryDesc , eflags );
179180 else
180181 standard_ExecutorStart (queryDesc , eflags );
181182
182183 /*
183- * This not fully correct sign for prevent passing each subquery to
184- * the remote instance. Only for demo.
184+ * This not fully correct sign for prevent passing each subquery to the
185+ * remote instance. Only for demo.
185186 */
186- if (ExtensionIsActive () &&
187- queryDesc -> plannedstmt -> canSetTag &&
188- !IsParallelWorker () &&
189- ((parsetree == NULL ) || (nodeTag (parsetree ) != T_CreatedbStmt )) &&
190- !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
191- {
192- Oid serverid ;
193- UserMapping * user ;
194- char * query ,
195- * query_container ,
196- * plan ,
197- * plan_container ;
198- int qlen , qlen1 ,
199- plen , plen1 ;
200- PGresult * res ;
201-
202- serverid = get_foreign_server_oid (remote_server_fdwname , true);
203- Assert (OidIsValid (serverid ));
204-
205- user = GetUserMapping (GetUserId (), serverid );
206- conn = GetConnection (user , true);
207-
208- set_portable_output (true);
209- plan = nodeToString (queryDesc -> plannedstmt );
210- set_portable_output (false);
211- plen = b64_enc_len (plan , strlen (plan ) + 1 );
212- plan_container = (char * ) palloc0 (plen + 1 );
213- plen1 = b64_encode (plan , strlen (plan ), plan_container );
214- Assert (plen > plen1 );
215-
216- qlen = b64_enc_len (queryDesc -> sourceText , strlen (queryDesc -> sourceText ) + 1 );
217- query_container = (char * ) palloc0 (qlen + 1 );
218- qlen1 = b64_encode (queryDesc -> sourceText , strlen (queryDesc -> sourceText ), query_container );
219- Assert (qlen > qlen1 );
220-
221- query = palloc0 (qlen + plen + 100 );
222- sprintf (query , "SELECT public.pg_exec_plan('%s', '%s');" , query_container , plan_container );
223-
224- res = PQexec (conn , query );
225- PQclear (res );
226- pfree (query );
227- pfree (query_container );
228- pfree (plan_container );
229- }
187+ if (ExtensionIsActive () &&
188+ queryDesc -> plannedstmt -> canSetTag &&
189+ !IsParallelWorker () &&
190+ ((parsetree == NULL ) || (nodeTag (parsetree ) != T_CreatedbStmt )) &&
191+ !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
192+ {
193+ Oid serverid ;
194+ UserMapping * user ;
195+ char * query ,
196+ * query_container ,
197+ * plan ,
198+ * plan_container ;
199+ int qlen ,
200+ qlen1 ,
201+ plen ,
202+ plen1 ;
203+ PGresult * res ;
204+
205+ serverid = get_foreign_server_oid (remote_server_fdwname , true);
206+ Assert (OidIsValid (serverid ));
207+
208+ user = GetUserMapping (GetUserId (), serverid );
209+ conn = GetConnection (user , true);
210+
211+ set_portable_output (true);
212+ plan = nodeToString (queryDesc -> plannedstmt );
213+ set_portable_output (false);
214+ plen = b64_enc_len (plan , strlen (plan ) + 1 );
215+ plan_container = (char * ) palloc0 (plen + 1 );
216+ plen1 = b64_encode (plan , strlen (plan ), plan_container );
217+ Assert (plen > plen1 );
218+
219+ qlen = b64_enc_len (queryDesc -> sourceText , strlen (queryDesc -> sourceText ) + 1 );
220+ query_container = (char * ) palloc0 (qlen + 1 );
221+ qlen1 = b64_encode (queryDesc -> sourceText , strlen (queryDesc -> sourceText ), query_container );
222+ Assert (qlen > qlen1 );
223+
224+ query = palloc0 (qlen + plen + 100 );
225+ sprintf (query , "SELECT public.pg_exec_plan('%s', '%s');" , query_container , plan_container );
226+
227+ res = PQexec (conn , query );
228+ PQclear (res );
229+ pfree (query );
230+ pfree (query_container );
231+ pfree (plan_container );
232+ }
230233}
231234
232235static void
0 commit comments