@@ -65,6 +65,8 @@ static void EXCHANGE_InitializeWorker(CustomScanState *node,
6565static Node * EXCHANGE_Create_state (CustomScan * node );
6666
6767
68+ #define END_OF_TUPLES 'E'
69+ #define END_OF_EXCHANGE 'Q'
6870void
6971EXCHANGE_Init_methods (void )
7072{
@@ -95,6 +97,7 @@ EXCHANGE_Init_methods(void)
9597 DistExec_Init_methods ();
9698}
9799
100+ #include "nodes/relation.h"
98101/*
99102 * Add one path for a base relation target: replace all ForeignScan nodes by
100103 * local Scan nodes.
@@ -116,7 +119,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
116119 */
117120 return ;
118121
119- elog (INFO , "INSERT EXCHANGE" );
122+ elog (INFO , "INSERT EXCHANGE. paths: %d" , list_length ( rel -> pathlist ) );
120123
121124 /* Traverse all possible paths and search for APPEND */
122125 foreach (lc , rel -> pathlist )
@@ -125,16 +128,15 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
125128 Path * tmpLocalScanPath = NULL ;
126129 AppendPath * appendPath = NULL ;
127130 ListCell * lc1 ;
128- List * private_data = NIL ;
129-
130- Assert (path -> pathtype != T_MergeAppend ); /* Do it later */
131+ Bitmapset * servers = NULL ;
132+ List * subpaths = NIL ;
131133
132134 if (path -> pathtype != T_Append )
133135 continue ;
134136
135- appendPath = makeNode ( AppendPath );
136- memcpy ( appendPath , path , sizeof ( AppendPath ));
137- appendPath -> subpaths = NIL ;
137+ // elog(INFO, "-> IE. path params: %hhu, ptype: %d, tcost: %f, scost: %f",
138+ // path->param_info != NULL , path->pathtype,
139+ // path->total_cost, path->startup_cost) ;
138140
139141 /*
140142 * Traverse all APPEND subpaths, check for scan-type and search for
@@ -145,7 +147,9 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
145147 Path * subpath = (Path * ) lfirst (lc1 );
146148 Path * tmpPath ;
147149 Oid serverid = InvalidOid ;
148-
150+ elog (INFO , "--> IE. subpath params: %hhu, ptype: %d, tcost: %f, scost: %f" ,
151+ subpath -> param_info != NULL , subpath -> pathtype ,
152+ subpath -> total_cost , subpath -> startup_cost );
149153 if ((subpath -> pathtype != T_ForeignScan ) && (tmpLocalScanPath ))
150154 /* Check assumption No.1 */
151155 Assert (tmpLocalScanPath -> pathtype == subpath -> pathtype );
@@ -159,8 +163,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
159163
160164 case T_ForeignScan :
161165 serverid = subpath -> parent -> serverid ;
166+ if (PATH_REQ_OUTER (subpath ) != NULL )
167+ continue ;
162168 tmpPath = (Path * ) makeNode (SeqScan );
163- tmpPath = create_seqscan_path (root , subpath -> parent , subpath -> parent -> lateral_relids , 0 );
169+ tmpPath = create_seqscan_path (root , subpath -> parent ,
170+ PATH_REQ_OUTER (subpath ), 0 );
164171 break ;
165172
166173 default :
@@ -170,22 +177,25 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
170177 if (!tmpLocalScanPath )
171178 tmpLocalScanPath = tmpPath ;
172179
173- appendPath -> subpaths = lappend (appendPath -> subpaths , tmpPath );
174- if (OidIsValid (serverid ))
175- private_data = lappend_oid (private_data , serverid );
180+ subpaths = lappend (subpaths , tmpPath );
181+ // appendPath->subpaths = lappend(appendPath->subpaths, tmpPath);
182+ if (OidIsValid (serverid ) && !bms_is_member ((int )serverid , servers ))
183+ servers = bms_add_member (servers , serverid );
176184 }
177185
178- if (private_data == NIL )
186+ if (servers == NULL )
179187 {
180- pfree (appendPath );
181188 elog (INFO , "NO one foreign source found" );
182189 continue ;
183190 }
184191 else
185- elog (INFO , "Source found: %d" , list_length ( private_data ));
192+ elog (INFO , "Source found: %d" , bms_num_members ( servers ));
186193
194+ appendPath = create_append_path (root , rel , subpaths , NIL ,
195+ PATH_REQ_OUTER (tmpLocalScanPath ), 0 , false,
196+ ((AppendPath * )path )-> partitioned_rels , -1 );
187197 path = create_exchange_path (root , rel , (Path * ) appendPath );
188- path = create_distexec_path (root , rel , path , private_data );
198+ path = create_distexec_path (root , rel , path , servers );
189199 add_path (rel , path );
190200 }
191201}
@@ -206,8 +216,8 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, Path *path)
206216
207217 /* Now I do not want to think about cost estimations. */
208218 path -> rows = baserel -> tuples ;
209- path -> startup_cost = 0 .0001 ;
210- path -> total_cost = path -> startup_cost + 0 .0001 * path -> rows ;
219+ path -> startup_cost = 10000 .0001 ;
220+ path -> total_cost = path -> startup_cost + 100000 .0001 * path -> rows ;
211221}
212222
213223/* XXX: Need to be placed in shared memory */
@@ -342,7 +352,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
342352{
343353 CustomScan * cscan = (CustomScan * ) node -> ss .ps .plan ;
344354 Plan * scan_plan ;
345- bool explain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY ) != 0 );
355+ // bool explain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0);
346356 PlanState * planState ;
347357 ExchangeState * state = (ExchangeState * ) node ;
348358 TupleDesc scan_tupdesc ;
@@ -353,7 +363,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
353363 planState = (PlanState * ) ExecInitNode (scan_plan , estate , eflags );
354364 node -> custom_ps = lappend (node -> custom_ps , planState );
355365
356- Assert ( Stream_subscribe (state -> stream ) );
366+ Stream_subscribe (state -> stream );
357367
358368 state -> init = false;
359369 state -> ltuples = 0 ;
@@ -375,23 +385,33 @@ distribution_fn_gather(TupleTableSlot *slot, DMQDestCont *dcont)
375385 return -1 ;
376386}
377387
378- static TupleTableSlot *
379- EXCHANGE_Execute ( CustomScanState * node )
388+ static void
389+ init_state_ifany ( ExchangeState * state )
380390{
381- ScanState * ss = & node -> ss ;
382- ScanState * subPlanState = linitial (node -> custom_ps );
383- ExchangeState * state = (ExchangeState * ) node ;
384- bool readRemote = true;
385-
386391 if (!state -> init )
387392 {
388393 EphemeralNamedRelation enr = get_ENR (state -> estate -> es_queryEnv , destsName );
394+
395+ Assert (enr != NULL && enr -> reldata != NULL );
389396 state -> dests = (DMQDestCont * ) enr -> reldata ;
390- state -> init = true;
391397 state -> hasLocal = true;
392398 state -> activeRemotes = state -> dests -> nservers ;
399+ state -> init = true;
400+ // elog(INFO, "[%d] EXCHANGE Init", getpid());
393401 }
394402
403+ }
404+
405+ static TupleTableSlot *
406+ EXCHANGE_Execute (CustomScanState * node )
407+ {
408+ ScanState * ss = & node -> ss ;
409+ ScanState * subPlanState = linitial (node -> custom_ps );
410+ ExchangeState * state = (ExchangeState * ) node ;
411+ bool readRemote = true;
412+
413+ init_state_ifany (state );
414+
395415 for (;;)
396416 {
397417 TupleTableSlot * slot = NULL ;
@@ -405,22 +425,23 @@ EXCHANGE_Execute(CustomScanState *node)
405425
406426 slot = RecvTuple (ss -> ss_ScanTupleSlot -> tts_tupleDescriptor ,
407427 state -> stream , & status );
408- if (status == 0 )
428+ switch (status )
409429 {
410- if (TupIsNull (slot ))
411- {
412- state -> activeRemotes -- ;
413- // elog(LOG, "Finish remote receiving. r=%d", state->rtuples);
414- }
415- else
416- {
417- state -> rtuples ++ ;
418- // elog(LOG, "GOT tuple from another node. r=%d", state->rtuples);
419- return slot ;
420- }
430+ case -1 :
431+ /* No tuples currently */
432+ break ;
433+ case 0 :
434+ Assert (!TupIsNull (slot ));
435+ state -> rtuples ++ ;
436+ return slot ;
437+ case 1 :
438+ state -> activeRemotes -- ;
439+ break ;
440+ case 2 : /* Close EXCHANGE channel */
441+ break ;
442+ default :
443+ Assert (0 );
421444 }
422- // else
423- // elog(LOG, "No remote tuples for now");
424445 }
425446
426447 if ((state -> hasLocal ) && (!readRemote ))
@@ -429,9 +450,9 @@ EXCHANGE_Execute(CustomScanState *node)
429450 if (TupIsNull (slot ))
430451 {
431452 int i ;
432- //elog(LOG, "FINISH Local store: l=%d, r=%d", state->ltuples, state->rtuples);
453+ // elog(LOG, "[%s] FINISH Local store: l=%d, r=%d", state->stream , state->ltuples, state->rtuples);
433454 for (i = 0 ; i < state -> dests -> nservers ; i ++ )
434- SendTuple (state -> dests -> dests [i ].dest_id , state -> stream , NULL );
455+ SendByteMessage (state -> dests -> dests [i ].dest_id , state -> stream , END_OF_TUPLES );
435456 state -> hasLocal = false;
436457 continue ;
437458 }
@@ -444,7 +465,8 @@ EXCHANGE_Execute(CustomScanState *node)
444465
445466 if ((state -> activeRemotes == 0 ) && (!state -> hasLocal ))
446467 {
447- elog (LOG , "Exchange returns NULL: %d %d" , state -> ltuples , state -> rtuples );
468+ elog (LOG , "[%s] Exchange returns NULL: %d %d" , state -> stream ,
469+ state -> ltuples , state -> rtuples );
448470 return NULL ;
449471 }
450472
@@ -457,7 +479,6 @@ EXCHANGE_Execute(CustomScanState *node)
457479 return slot ;
458480 else
459481 {
460- // elog(LOG, "Send real tuple");
461482 SendTuple (dest , state -> stream , slot );
462483 }
463484 }
@@ -471,20 +492,25 @@ EXCHANGE_End(CustomScanState *node)
471492
472493 Assert (list_length (node -> custom_ps ) == 1 );
473494 ExecEndNode (linitial (node -> custom_ps ));
474- Assert (Stream_unsubscribe (state -> stream ));
475- elog (LOG , "EXCHANGE_END" );
476- /*
477- * Clean out exchange state
478- */
495+ Stream_unsubscribe (state -> stream );
496+
497+ elog (INFO , "EXCHANGE_END" );
479498}
480499
481500static void
482501EXCHANGE_Rescan (CustomScanState * node )
483502{
484- PlanState * outerPlan = outerPlanState (node );
503+ ExchangeState * state = (ExchangeState * ) node ;
504+ PlanState * subPlan = (PlanState * ) linitial (node -> custom_ps );
485505
486- if (outerPlan -> chgParam == NULL )
487- ExecReScan (outerPlan );
506+ init_state_ifany (state );
507+ elog (INFO , "Rescan exchange! %d" , getpid ());
508+ if (subPlan -> chgParam == NULL )
509+ ExecReScan (subPlan );
510+ state -> activeRemotes = state -> dests -> nservers ;
511+ state -> ltuples = 0 ;
512+ state -> rtuples = 0 ;
513+ state -> hasLocal = true;
488514}
489515
490516static void
@@ -500,8 +526,10 @@ static void
500526EXCHANGE_Explain (CustomScanState * node , List * ancestors , ExplainState * es )
501527{
502528 StringInfoData str ;
529+ ExchangeState * state = (ExchangeState * ) node ;
503530
504531 initStringInfo (& str );
532+ appendStringInfo (& str , "stream: %s. " , state -> stream );
505533 ExplainPropertyText ("Exchange" , str .data , es );
506534}
507535
0 commit comments