1- /*
1+ /*-------------------------------------------------------------------------
2+ *
23 * repeater.c
4+ * Simple demo for remote plan execution patch.
35 *
6+ * Transfer query plan to a remote instance and wait for result.
7+ * Remote instance parameters (host, port) defines by GUCs.
8+ *
9+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+ * Portions Copyright (c) 2018-2019, Postgres Professional
11+ *-------------------------------------------------------------------------
412 */
513
614#include "postgres.h"
1119#include "fmgr.h"
1220#include "libpq/libpq.h"
1321#include "libpq-fe.h"
14- #include "nodes/params.h"
1522#include "optimizer/planner.h"
1623#include "tcop/utility.h"
1724#include "utils/guc.h"
18- #include "utils/memutils.h"
19- #include "utils/plancache.h"
2025
2126PG_MODULE_MAGIC ;
2227
@@ -32,12 +37,14 @@ static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
3237 char * completionTag );
3338static void HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags );
3439static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
35- static int execute_query (PGconn * dest , QueryDesc * queryDesc , int eflags );
36-
3740
41+ /* Remote instance parameters. */
3842char * repeater_host_name ;
3943int repeater_port_number ;
4044
45+ static bool ExtensionIsActivated = false;
46+ static PGconn * conn = NULL ;
47+
4148/*
4249 * Module load/unload callback
4350 */
@@ -79,8 +86,6 @@ _PG_init(void)
7986 ExecutorEnd_hook = HOOK_ExecEnd_injection ;
8087}
8188
82- static PGconn * conn = NULL ;
83-
8489static PGconn *
8590EstablishConnection (void )
8691{
@@ -90,17 +95,17 @@ EstablishConnection(void)
9095 return conn ;
9196
9297 /* Connect to slave and send it a query plan */
93- sprintf (conninfo , "host=localhost port=5433%c" , '\0' );
98+ sprintf (conninfo , "host=%s port=%d %c" , repeater_host_name , repeater_port_number , '\0' );
9499 conn = PQconnectdb (conninfo );
95100
96101 if (PQstatus (conn ) == CONNECTION_BAD )
97102 elog (LOG , "Connection error. conninfo: %s" , conninfo );
103+ else
104+ elog (LOG , "Connection established: host=%s, port=%d" , repeater_host_name , repeater_port_number );
98105
99106 return conn ;
100107}
101108
102- static bool ExtensionIsActivated = false;
103-
104109static bool
105110ExtensionIsActive (void )
106111{
@@ -117,6 +122,10 @@ ExtensionIsActive(void)
117122 return ExtensionIsActivated ;
118123}
119124
125+ /*
126+ * We need to send some DML queries for sync database schema to a plan execution
127+ * at a remote instance.
128+ */
120129static void
121130HOOK_Utility_injection (PlannedStmt * pstmt ,
122131 const char * queryString ,
@@ -126,27 +135,35 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
126135 DestReceiver * dest ,
127136 char * completionTag )
128137{
129- Node * parsetree = pstmt -> utilityStmt ;
138+ Node * parsetree = pstmt -> utilityStmt ;
139+ PGresult * result ;
130140
141+ /*
142+ * Very non-trivial decision about transferring utility query to data nodes.
143+ * This exception list used for demonstration and let us to execute some
144+ * simple queries.
145+ */
131146 if (ExtensionIsActive () &&
147+ pstmt -> canSetTag &&
132148 (nodeTag (parsetree ) != T_CopyStmt ) &&
133149 (nodeTag (parsetree ) != T_CreateExtensionStmt ) &&
134150 (nodeTag (parsetree ) != T_ExplainStmt ) &&
151+ (nodeTag (parsetree ) != T_FetchStmt ) &&
135152 (context != PROCESS_UTILITY_SUBCOMMAND )
136153 )
137154 {
138- PGresult * result ;
139-
155+ /*
156+ * Previous query could be completed with error report at this instance.
157+ * In this case, we need to prepare connection to the remote instance.
158+ */
140159 while ((result = PQgetResult (EstablishConnection ())) != NULL );
141160
142161 if (PQsendQuery (EstablishConnection (), queryString ) == 0 )
143- {
144- elog ( ERROR , "Sending UTILITY query error: %s" , queryString );
145- PQreset ( conn );
146- }
162+ elog ( ERROR , "Connection error: query: %s, status=%d, errmsg=%s" ,
163+ queryString ,
164+ PQstatus ( EstablishConnection ()),
165+ PQerrorMessage ( EstablishConnection ()));
147166 }
148- else
149- elog (LOG , "UTILITY query without sending: %s" , queryString );
150167
151168 if (next_ProcessUtility_hook )
152169 (* next_ProcessUtility_hook ) (pstmt , queryString , context , params ,
@@ -156,48 +173,51 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
156173 context , params , queryEnv ,
157174 dest , completionTag );
158175
176+ /*
177+ * Check end of query execution at the remote instance.
178+ */
159179 if (conn )
160- {
161- PGresult * result ;
162-
163180 while ((result = PQgetResult (conn )) != NULL );
164- }
165181}
166- static int IsExecuted = 0 ;
167182
168183static void
169184HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags )
170185{
171- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
186+ Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
187+ PGresult * result ;
188+ PGconn * dest = EstablishConnection ();
172189
173190 if (prev_ExecutorStart )
174191 prev_ExecutorStart (queryDesc , eflags );
175192 else
176193 standard_ExecutorStart (queryDesc , eflags );
177-
178- IsExecuted ++ ;
179-
180- if (IsExecuted > 1 )
181- return ;
182-
183- if (
184- ExtensionIsActive () &&
185- (repeater_host_name == 0 ) &&
186- ((parsetree == NULL ) || (nodeTag (parsetree ) != T_CreatedbStmt )) &&
187- !(eflags & EXEC_FLAG_EXPLAIN_ONLY )
188- )
194+ elog (LOG , "QUERY: %s" , queryDesc -> sourceText );
195+ /*
196+ * This not fully correct sign for prevent passing each subquery to
197+ * the remote instance. Only for demo.
198+ */
199+ if (ExtensionIsActive () &&
200+ queryDesc -> plannedstmt -> canSetTag &&
201+ ((parsetree == NULL ) || (nodeTag (parsetree ) != T_CreatedbStmt )) &&
202+ !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
189203 {
190- elog (LOG , "Send query: %s" , queryDesc -> sourceText );
191- if (execute_query (EstablishConnection (), queryDesc , eflags ) == 0 )
192- PQreset (conn );
204+ /*
205+ * Prepare connection.
206+ */
207+ while ((result = PQgetResult (dest )) != NULL );
208+ elog (LOG , "->QUERY: %s" , queryDesc -> sourceText );
209+ if (PQsendPlan (dest , serialize_plan (queryDesc , eflags )) == 0 )
210+ /*
211+ * Report about remote execution error.
212+ */
213+ elog (ERROR , "Connection errors during PLAN transferring: status=%d, errmsg=%s" ,
214+ PQstatus (dest ), PQerrorMessage (dest ));
193215 }
194216}
195217
196218static void
197219HOOK_ExecEnd_injection (QueryDesc * queryDesc )
198220{
199- IsExecuted -- ;
200- /* Execute before hook because it destruct memory context of exchange list */
201221 if (conn )
202222 {
203223 PGresult * result ;
@@ -210,32 +230,3 @@ HOOK_ExecEnd_injection(QueryDesc *queryDesc)
210230 else
211231 standard_ExecutorEnd (queryDesc );
212232}
213-
214-
215- /*
216- * Serialize plan and send it to the destination instance
217- */
218- static int
219- execute_query (PGconn * dest , QueryDesc * queryDesc , int eflags )
220- {
221- PGresult * result ;
222-
223- Assert (dest != NULL );
224-
225- /*
226- * Before send of plan we need to check connection state.
227- * If previous query was failed, we get PGRES_FATAL_ERROR.
228- */
229- while ((result = PQgetResult (dest )) != NULL );
230-
231- if (PQsendPlan (dest , serialize_plan (queryDesc , eflags )) == 0 )
232- {
233- /*
234- * Report about remote execution error and return control to caller.
235- */
236- elog (ERROR , "PLAN sending error." );
237- return 0 ;
238- }
239-
240- return 1 ;
241- }
0 commit comments