2323static set_rel_pathlist_hook_type prev_set_rel_pathlist_hook = NULL ;
2424static shmem_startup_hook_type PreviousShmemStartupHook = NULL ;
2525static set_join_pathlist_hook_type prev_set_join_pathlist_hook = NULL ;
26- static void second_stage_paths (PlannerInfo * root , List * firstStagePaths , RelOptInfo * joinrel ,
26+ static void second_stage_paths (PlannerInfo * root , RelOptInfo * joinrel ,
2727 RelOptInfo * outerrel , RelOptInfo * innerrel ,
2828 JoinType jointype , JoinPathExtraData * extra );
2929
@@ -95,15 +95,56 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
9595 JoinPathExtraData * extra )
9696{
9797 ListCell * lc ;
98- List * firstStagePaths = NIL ; /* Trivial paths, made with exchange */
98+ int i = 0 ;
99+ List * newpaths = NIL ;
100+ List * delpaths = NIL ;
101+ List * rpaths = NIL ;
102+ int distpaths = 0 ;
99103
100104 if (prev_set_join_pathlist_hook )
101105 prev_set_join_pathlist_hook (root , joinrel , outerrel , innerrel ,
102106 jointype , extra );
103107
104108 /*
105- * At first, traverse all paths and search for the case with Exchanges at
106- * the left or right subtree. We need to delete DistPlanExec nodes and
109+ * Search for distributed paths for children relations.
110+ */
111+ foreach (lc , innerrel -> pathlist )
112+ {
113+ Path * path = lfirst (lc );
114+
115+ if (IsDistExecNode (path ))
116+ distpaths ++ ;
117+ }
118+
119+ if (distpaths == 0 )
120+ /* Distributed query execution does not needed. */
121+ return ;
122+
123+ /*
124+ * Force try to add hash joins into the pathlist. Collect any paths that
125+ * do not satisfy the exchange rules and delete it.
126+ */
127+ foreach (lc , joinrel -> pathlist )
128+ {
129+ Path * path = lfirst (lc );
130+ if ((path -> pathtype != T_HashJoin ) && !IsDistExecNode (path ))
131+ rpaths = lappend (rpaths , path );
132+ }
133+ foreach (lc , rpaths )
134+ joinrel -> pathlist = list_delete_ptr (joinrel -> pathlist , lfirst (lc ));
135+
136+ /*
137+ * Try to create hash join path.
138+ * XXX: Here we have a problem of path additions: hash join may be not added
139+ * if it is not cheap. But cost of the this path includes costs of child paths.
140+ * Child paths will be tuned below and costs will be changed too.
141+ */
142+ hash_inner_and_outer (root , joinrel , outerrel , innerrel , jointype , extra );
143+ Assert (list_length (joinrel -> pathlist ) > 0 );
144+
145+ /*
146+ * Traverse all paths and search for the case with EXCHANGE nodes
147+ * at the left or right subtree. We need to delete DistPlanExec nodes and
107148 * insert only one at the head of join.
108149 */
109150 foreach (lc , joinrel -> pathlist )
@@ -115,9 +156,15 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
115156 CustomPath * sub ;
116157 Path * path = lfirst (lc );
117158
118- if ((path -> pathtype != T_NestLoop ) &&
119- (path -> pathtype != T_MergeJoin ) &&
120- (path -> pathtype != T_HashJoin ))
159+ /*
160+ * NestLoop and MergeJoin need to change EXCHANGE node logic and
161+ * disabled for now.
162+ * For this we need to introduce remote PUSH or PULL operation for
163+ * force transfer tuples from instance to instance.
164+ */
165+ Assert (path -> pathtype != T_NestLoop && path -> pathtype != T_MergeJoin );
166+
167+ if (path -> pathtype != T_HashJoin )
121168 continue ;
122169
123170 jp = (JoinPath * ) path ;
@@ -128,9 +175,7 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
128175 * If inner path contains DistExec node - save its servers list and
129176 * delete it from the path.
130177 */
131- if ((inner -> pathtype == T_CustomScan ) &&
132- (strcmp (((CustomPath * ) inner )-> methods -> CustomName ,
133- DISTEXECPATHNAME ) == 0 ))
178+ if (IsDistExecNode (inner ))
134179 {
135180 ListCell * lc ;
136181
@@ -145,15 +190,19 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
145190 }
146191 Assert (list_length (sub -> custom_paths ) == 1 );
147192 jp -> innerjoinpath = (Path * ) linitial (sub -> custom_paths );
193+ } else
194+ {
195+ elog (LOG , "inner Path can't contain any FE nodes. JT=%d (innt=%d patht=%d), (outt=%d patht=%d) %d" ,
196+ jp -> jointype , inner -> type , inner -> pathtype ,
197+ outer -> type , outer -> pathtype , i ++ );
198+
148199 }
149200
150201 /*
151202 * If outer path contains DistExec node - save its servers list and
152203 * delete it from the path.
153204 */
154- if ((outer -> pathtype == T_CustomScan ) &&
155- (strcmp (((CustomPath * ) outer )-> methods -> CustomName ,
156- DISTEXECPATHNAME ) == 0 ))
205+ if (IsDistExecNode (outer ))
157206 {
158207 ListCell * lc ;
159208
@@ -177,25 +226,29 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
177226 path = create_distexec_path (root , joinrel ,
178227 (Path * ) copy_join_pathnode (jp ),
179228 servers );
180- add_path (joinrel , path );
229+ newpaths = lappend (newpaths , path );
230+ delpaths = lappend (delpaths , jp );
231+ }
181232
182- /*
183- * We need guarantee, that previous JOIN path was deleted. It was
184- * incorrect.
185- */
186- list_delete_ptr (joinrel -> pathlist , jp );
233+ /*
234+ * We need to guarantee, that previous JOIN path was deleted from the path
235+ * list. It was incorrect.
236+ */
237+ foreach (lc , delpaths )
238+ {
239+ Path * path = lfirst (lc );
240+ joinrel -> pathlist = list_delete_ptr (joinrel -> pathlist , path );
241+ }
187242
188- /* Save link to the path for future works. */
189- firstStagePaths = lappend (firstStagePaths , path );
243+ foreach (lc , newpaths )
244+ {
245+ Path * path = lfirst (lc );
246+ add_path (joinrel , path );
190247 }
191248
192- second_stage_paths (root , firstStagePaths , joinrel , outerrel , innerrel , jointype ,
193- extra );
249+ second_stage_paths (root , joinrel , outerrel , innerrel , jointype , extra );
194250}
195251
196- #define IsDistExecNode (pathnode ) ((pathnode->path.pathtype == T_CustomScan) && \
197- (strcmp(((CustomPath *)pathnode)->methods->CustomName, DISTEXECPATHNAME) == 0))
198-
199252static CustomPath *
200253duplicate_join_path (CustomPath * distExecPath )
201254{
@@ -321,10 +374,7 @@ arrange_partitioning_attrs(RelOptInfo *rel1,
321374 part_scheme -> partnatts ++ ;
322375 ReleaseSysCache (opclasstup );
323376 }
324- //elog(INFO, "arrange_partitioning_attrs: ");
325- //elog(INFO, "->1: %s ", nodeToString(rel1->partexprs[0]));
326- //elog(INFO, "->2: %s ", nodeToString(rel2->partexprs[0]));
327- //elog(INFO, "restrictlist: %s", nodeToString(restrictlist));
377+
328378 /* Now we use hash partition only */
329379 Assert ((rel1 -> part_scheme -> strategy == PARTITION_STRATEGY_HASH ) &&
330380 (rel1 -> part_scheme -> strategy == rel2 -> part_scheme -> strategy ));
@@ -394,24 +444,38 @@ arrange_partitions(RelOptInfo *rel1,
394444 * Add Paths same as the case of partitionwise join.
395445 */
396446static void
397- second_stage_paths (PlannerInfo * root , List * firstStagePaths , RelOptInfo * joinrel , RelOptInfo * outerrel ,
447+ second_stage_paths (PlannerInfo * root , RelOptInfo * joinrel , RelOptInfo * outerrel ,
398448 RelOptInfo * innerrel , JoinType jointype , JoinPathExtraData * extra )
399449{
400450 ListCell * lc ;
401451
402- if (list_length (firstStagePaths ) == 0 )
403- return ;
452+ elog (LOG , "List length=%d" , list_length (joinrel -> pathlist ));
453+ foreach (lc , joinrel -> pathlist )
454+ {
455+ Path * path = lfirst (lc );
456+ elog (LOG , "[%d] BEFORE SECOND STAGE: type=%d pathtype=%d" ,
457+ list_length (joinrel -> pathlist ), path -> type , path -> pathtype );
458+ if (path -> type > 300 )
459+ Assert (0 );
460+ }
404461
405- foreach (lc , firstStagePaths )
462+ foreach (lc , joinrel -> pathlist )
406463 {
407- CustomPath * path = (CustomPath * ) lfirst (lc );
464+ Path * pathhead = (Path * ) lfirst (lc );
465+ CustomPath * path ;
408466 JoinPath * jp ;
409467 ExchangePath * innerex ;
410468 ExchangePath * outerex ;
411469 ExchangePath * expath ;
412470 int i ;
413471
414- Assert (IsDistExecNode (path ));
472+ if (!IsDistExecNode (pathhead ))
473+ {
474+ elog (LOG , "NO second_stage_paths. type=%d, pathtype=%d" , pathhead -> type , pathhead -> pathtype );
475+ continue ;
476+ }
477+
478+ path = (CustomPath * ) pathhead ;
415479
416480 /*
417481 * Add gather-type EXCHANGE node into the head of the path.
@@ -420,7 +484,9 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
420484 if (!IsA (((Path * ) linitial (path -> custom_paths )), CustomScan ))
421485 {
422486 jp = (JoinPath * ) linitial (path -> custom_paths );
423- Assert (jp -> path .pathtype == T_HashJoin );
487+
488+ if (jp -> path .pathtype != T_HashJoin )
489+ continue ;
424490 expath = create_exchange_path (root , joinrel , (Path * ) jp , GATHER_MODE );
425491 path -> custom_paths = list_delete (path -> custom_paths , jp );
426492 path -> custom_paths = lappend (path -> custom_paths , expath );
@@ -433,7 +499,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
433499 & innerex -> altrel , extra -> restrictlist , jointype ))
434500 {
435501 /* Simple case like foreign-push-join case. */
436- // elog(INFO, "--- MAKE SIMPLE PATH ---");
437502 innerex -> mode = STEALTH_MODE ;
438503 outerex -> mode = STEALTH_MODE ;
439504 }
@@ -442,7 +507,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
442507 CustomPath * newpath ;
443508 bool res ;
444509
445- // elog(INFO, "--- MAKE SHUFFLE PATH ---");
446510 /* Get a copy of the simple path */
447511 newpath = duplicate_join_path (path );
448512 set_path_pointers (newpath , & jp , & expath , & outerex , & innerex );
0 commit comments