22#include "postgres.h"
33
44#include "catalog/namespace.h"
5- #include "commands/extension.h"
6- #include "executor/execdesc.h"
7- #include "executor/executor.h"
8- #include "fmgr.h"
9- #include "libpq/libpq.h"
10- #include "libpq-fe.h"
11- #include "nodes/params.h"
12- #include "planwalker.h"
13- #include "optimizer/planner.h"
5+ #include "common/base64.h"
6+ #include "nodes/nodeFuncs.h"
147#include "storage/lmgr.h"
15- #include "tcop/utility.h"
168#include "utils/builtins.h"
17- #include "utils/guc.h"
189#include "utils/lsyscache.h"
19- #include "utils/plancache.h"
2010#include "utils/snapmgr.h"
2111
12+ #include "exec_plan.h"
13+ #include "planwalker.h"
14+
2215PG_MODULE_MAGIC ;
2316
2417PG_FUNCTION_INFO_V1 (pg_execute_plan );
2518
26- void _PG_init (void );
27-
28- static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
29- static planner_hook_type prev_planner_hook = NULL ;
30- static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
31- static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
32-
33- static void HOOK_Utility_injection (PlannedStmt * pstmt , const char * queryString ,
34- ProcessUtilityContext context , ParamListInfo params ,
35- QueryEnvironment * queryEnv , DestReceiver * dest ,
36- char * completionTag );
37- static PlannedStmt * HOOK_Planner_injection (Query * parse , int cursorOptions ,
38- ParamListInfo boundParams );
39- static void HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags );
40- static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
4119static char * serialize_plan (QueryDesc * queryDesc , int eflags );
42- static void execute_query (char * planString );
4320static bool store_irel_name (Plan * plan , char * buffer );
4421
45- static PGconn * conn = NULL ;
46-
47- int node_number1 = 0 ;
48-
49- /*
50- * Module load/unload callback
51- */
52- void
53- _PG_init (void )
54- {
55- DefineCustomIntVariable ("pargres.node" ,
56- "Node number in instances collaboration" ,
57- NULL ,
58- & node_number1 ,
59- 0 ,
60- 0 ,
61- 1023 ,
62- PGC_SIGHUP ,
63- GUC_NOT_IN_SAMPLE ,
64- NULL ,
65- NULL ,
66- NULL );
67-
68- /* ProcessUtility hook */
69- next_ProcessUtility_hook = ProcessUtility_hook ;
70- ProcessUtility_hook = HOOK_Utility_injection ;
71-
72- /* Planner hook */
73- prev_planner_hook = planner_hook ;
74- planner_hook = HOOK_Planner_injection ;
75-
76- prev_ExecutorStart = ExecutorStart_hook ;
77- ExecutorStart_hook = HOOK_ExecStart_injection ;
78-
79- prev_ExecutorEnd = ExecutorEnd_hook ;
80- ExecutorEnd_hook = HOOK_ExecEnd_injection ;
81- }
82-
83- static void
84- HOOK_Utility_injection (PlannedStmt * pstmt ,
85- const char * queryString ,
86- ProcessUtilityContext context ,
87- ParamListInfo params ,
88- QueryEnvironment * queryEnv ,
89- DestReceiver * dest ,
90- char * completionTag )
91- {
92- Node * parsetree = pstmt -> utilityStmt ;
93-
94- if ((OidIsValid (get_extension_oid ("execplan" , true))) &&
95- (node_number1 == 0 ) &&
96- (nodeTag (parsetree ) != T_CopyStmt ) &&
97- (nodeTag (parsetree ) != T_CreateExtensionStmt ) &&
98- (context != PROCESS_UTILITY_SUBCOMMAND ))
99- {
100- char conninfo [1024 ];
101- int status ;
102-
103- // elog(LOG, "Send UTILITY query %d: %s", nodeTag(parsetree), queryString);
104-
105- /* Connect to slave and send it a query plan */
106- sprintf (conninfo , "host=localhost port=5433%c" , '\0' );
107- conn = PQconnectdb (conninfo );
108- if (PQstatus (conn ) == CONNECTION_BAD )
109- elog (LOG , "Connection error. conninfo: %s" , conninfo );
110-
111- status = PQsendQuery (conn , queryString );
112- if (status == 0 )
113- elog (ERROR , "Query sending error: %s" , PQerrorMessage (conn ));
114- }
115- else if (node_number1 == 0 )
116- elog (LOG , "UTILITY query without sending: %s" , queryString );
117-
118- if (next_ProcessUtility_hook )
119- (* next_ProcessUtility_hook ) (pstmt , queryString , context , params ,
120- queryEnv , dest , completionTag );
121- else
122- standard_ProcessUtility (pstmt , queryString ,
123- context , params , queryEnv ,
124- dest , completionTag );
125-
126- if (conn )
127- {
128- PGresult * result ;
129-
130- while ((result = PQgetResult (conn )) != NULL )
131- Assert (PQresultStatus (result ) != PGRES_FATAL_ERROR );
132- PQfinish (conn );
133- conn = NULL ;
134- }
135- }
136-
13722/*
13823 * INPUT: a base64-encoded serialized plan
13924 */
140- static void
141- execute_query (char * planString )
25+ void
26+ execute_query (PGconn * dest , QueryDesc * queryDesc , int eflags )
14227{
143- char conninfo [1024 ];
14428 char * SQLCommand ;
14529 int status ;
30+ char * serializedPlan ;
31+ PGresult * result ;
14632
147- /* Connect to slave and send it a query plan */
148- sprintf (conninfo , "host=localhost port=5433%c" , '\0' );
149- conn = PQconnectdb (conninfo );
150- if (PQstatus (conn ) == CONNECTION_BAD )
151- elog (LOG , "Connection error. conninfo: %s" , conninfo );
152-
153- SQLCommand = (char * ) palloc0 (strlen (planString )+ 100 );
154- sprintf (SQLCommand , "SELECT pg_execute_plan('%s');" , planString );
155- //elog(LOG, "query: %s", SQLCommand);
156- status = PQsendQuery (conn , SQLCommand );
157- if (status == 0 )
158- elog (ERROR , "Query sending error: %s" , PQerrorMessage (conn ));
159- }
33+ Assert (dest != NULL );
16034
161- static PlannedStmt *
162- HOOK_Planner_injection (Query * parse , int cursorOptions ,
163- ParamListInfo boundParams )
164- {
165- PlannedStmt * pstmt ;
166-
167- conn = NULL ;
168-
169- if (prev_planner_hook )
170- pstmt = prev_planner_hook (parse , cursorOptions , boundParams );
171- else
172- pstmt = standard_planner (parse , cursorOptions , boundParams );
173-
174- if ((node_number1 > 0 ) || (parse -> utilityStmt != NULL ))
175- return pstmt ;
176-
177- /* Extension is not initialized. */
178- if (OidIsValid (get_extension_oid ("execplan" , true)))
179- {
180-
181- }
182- return pstmt ;
183- }
184-
185- static void
186- HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags )
187- {
188- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
189-
190- if (prev_ExecutorStart )
191- prev_ExecutorStart (queryDesc , eflags );
192- else
193- standard_ExecutorStart (queryDesc , eflags );
194-
195- if ((OidIsValid (get_extension_oid ("execplan" , true))) &&
196- (node_number1 == 0 ) &&
197- ((parsetree == NULL ) || (nodeTag (parsetree ) != T_CreatedbStmt )))
198- {
199- // elog(LOG, "Send query: %s", queryDesc->sourceText);
200- execute_query (serialize_plan (queryDesc , eflags ));
201- }
202- }
35+ /*
36+ * Before send of plan we need to check connection state.
37+ * If previous query was failed, we get PGRES_FATAL_ERROR.
38+ */
39+ while ((result = PQgetResult (dest )) != NULL );
20340
204- static void
205- HOOK_ExecEnd_injection (QueryDesc * queryDesc )
206- {
207- /* Execute before hook because it destruct memory context of exchange list */
208- if (conn )
209- {
210- PGresult * result ;
41+ serializedPlan = serialize_plan (queryDesc , eflags );
42+ /* Connect to slave and send it a query plan */
43+ SQLCommand = (char * ) palloc0 (strlen (serializedPlan )+ 100 );
44+ sprintf (SQLCommand , "SELECT pg_execute_plan('%s');" , serializedPlan );
21145
212- while ((result = PQgetResult (conn )) != NULL )
213- Assert (PQresultStatus (result ) != PGRES_FATAL_ERROR );
214- PQfinish (conn );
215- conn = NULL ;
216- }
46+ status = PQsendQuery (dest , SQLCommand );
21747
218- if (prev_ExecutorEnd )
219- prev_ExecutorEnd (queryDesc );
220- else
221- standard_ExecutorEnd (queryDesc );
48+ if (status == 0 )
49+ elog (ERROR , "Query sending error: %s" , PQerrorMessage (dest ));
22250}
22351
224- #include "common/base64.h"
225- #include "nodes/nodeFuncs.h"
226-
22752static bool
22853compute_irels_buffer_len (Plan * plan , int * length )
22954{
@@ -258,8 +83,6 @@ compute_irels_buffer_len(Plan *plan, int *length)
25883 return plan_tree_walker (plan , compute_irels_buffer_len , length );
25984}
26085
261- //#include "nodes/pg_list.h"
262-
26386static char *
26487serialize_plan (QueryDesc * queryDesc , int eflags )
26588{
@@ -275,7 +98,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
27598 * econtainer ,
27699 * start_address ;
277100 ListCell * lc ;
278-
101+ elog ( LOG , "Send QUERY: %s" , queryDesc -> sourceText );
279102 serialized_plan = nodeToString (queryDesc -> plannedstmt );
280103
281104 /*
@@ -302,11 +125,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
302125 * to save the relation names in serialized plan.
303126 */
304127 compute_irels_buffer_len (queryDesc -> plannedstmt -> planTree , & inames_len );
305- // plan_tree_walker(queryDesc->plannedstmt->planTree,
306- // compute_irels_buffer_len,
307- // &inames_len);
308- // planstate_tree_walker((PlanState *) (queryDesc->planstate), compute_irels_buffer_len, &inames_len);
309- //elog(LOG, "inames_len=%d", inames_len);
128+
310129 /* We use len+1 bytes for include end-of-string symbol. */
311130 splan_len = strlen (serialized_plan ) + 1 ;
312131 qtext_len = strlen (queryDesc -> sourceText ) + 1 ;
@@ -353,9 +172,6 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
353172 }
354173 }
355174 store_irel_name ((Plan * ) (queryDesc -> plannedstmt -> planTree ), start_address );
356- // plan_tree_walker((Plan *) (queryDesc->plannedstmt->planTree),
357- // store_irel_name,
358- // start_address);
359175
360176 start_address += inames_len ;
361177 Assert ((start_address - container ) == tot_len );
@@ -525,7 +341,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
525341
526342 /* Restore query source text string */
527343 queryString = start_addr ;
528- // elog(LOG, "queryString : %s", queryString);
344+ elog (LOG , "Recv QUERY : %s" , queryString );
529345 /* Restore instrument and flags */
530346 start_addr += strlen (queryString ) + 1 ;
531347 instrument_options = (int * ) start_addr ;
@@ -544,7 +360,6 @@ pg_execute_plan(PG_FUNCTION_ARGS)
544360 {
545361 rte -> relid = RelnameGetRelid (start_addr );
546362 Assert (rte -> relid != InvalidOid );
547- // elog(LOG, "Relation from decoded plan. relid=%d relname=%s", rte->relid, start_addr);
548363 start_addr += strlen (start_addr ) + 1 ;
549364 }
550365 }
@@ -565,7 +380,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
565380 ExecutorFinish (queryDesc );
566381 ExecutorEnd (queryDesc );
567382 FreeQueryDesc (queryDesc );
568-
383+ // elog(LOG, "End of QUERY: %s", queryString);
569384 pfree (decdata );
570385 PG_RETURN_BOOL (true);
571386}
0 commit comments