I am querying N tables remotely using dblink.
This queries represent a sort of "sharding" of the data.
Each remote query takes roughly the same time (~ 0.5 sec).
However when I gather all the data in one query for a simple count(*) using either join, union all or CTE/with the total amount of time became "linear" which means 0.5*N.
That of course kills a lot of the purpose for the "sharding" process.
Is there any way to force Postgres (11) to gather the data in parallel instead of sequentialising it?
UPDATE
The remote query I am running on the single "shard" is based on a function that wraps a single spatial query along with an aggregation and it uses indexes and partitions (spatial query along with an aggregation):
CREATE OR REPLACE FUNCTION my_func(pt character(1), bd smallint, polyg geometry, tf date, tt date) RETURNS
TABLE(import_date date, bedrooms smallint, property_type character(1),
.....,r_rent_paid float[]) AS $$
BEGIN
RETURN query
select
coalesce(s.import_date, r.import_date) as import_date,
coalesce(s.bedrooms, r.bedrooms) as bedrooms,
coalesce(s.property_type, r.property_type) as property_type,
s.s_size,
.....,
r.rent_paid
from
(
select
sc.import_date,
sc.bedrooms,
sc.property_type,
......multiple percentile_cont......,
percentile_cont(array[0.25,0.5,0.75,0.9]) within group (order by sc.discount) filter(where sc.discount > 0) as discount
from node.sales sc
where sc.property_type = pt and sc.bedrooms = bd and st_Intersects(polyg,sc.geom)
and sc.import_date between tf and tt
group by sc.import_date, sc.bedrooms, sc.property_type
) s
full join
(
select
rc.import_date,
rc.bedrooms,
rc.property_type,
......multiple percentile_cont......,
percentile_cont(array[0.25,0.5,0.75,0.9]) within group (order by rc.asking_rent_sqft) filter(where rc.asking_rent_sqft > 0) as rent_paid_sqft
from node.rents rc
where rc.property_type = pt and rc.bedrooms = bd and st_Intersects(polyg,rc.geom)
and rc.import_date between tf and tt
group by rc.import_date, rc.bedrooms, rc.property_type
) r
on r.import_date = s.import_date and r.bedrooms = s.bedrooms and r.property_type = s.property_type;
END $$ language plpgsql;
I have tried to run this functions using postgres_fdw creating this function only on the "master" and running it like this:
select import_date, bedrooms, property_type, count(*) from ( SELECT * FROM my_func(...point import foreign schema shard-1...) union all SELECT * FROM my_func(...point import foreign schema shard-2...) union all .... 6 more ) K GROUP BY import_date, bedrooms, property_type;
However the call is blocking for each select so it kills the process.
As alternative so far I tried using dblink as it create no blocking queries (async):
SELECT dblink_connect('dtest1', 'host=xxxx ....');
SELECT dblink_connect('dtest2', 'host=xxxx ....');
.....
SELECT dblink_connect('dtest8', 'host=xxxx ....');
SELECT dblink_send_query('dtest1', 'select * from my_func(...)');
SELECT dblink_send_query('dtest2', 'select * from my_func(...)');
.....
SELECT dblink_send_query('dtest8', 'select * from my_func(...)');
and finally:
select import_date, bedrooms, property_type, count(*)
from (
SELECT * FROM dblink_get_result('dtest1') AS t1(....)
union all
SELECT * FROM dblink_get_result('dtest2') AS t2(....)
union all
.....
union all
SELECT * FROM dblink_get_result('dtest8') AS t8(....)
) K
GROUP BY import_date, bedrooms, property_type;
which is still going "linear" and if I run the query soon after the first run it may no return results (unless I wait a few seconds from the previous).
UPDATE 2
What is look a working solution so far is what @Richard Huxton suggested, about using PL/Proxy which allows to run remote functions (in parallel when you use Cluster/RUN ALL). I have also tried CITUS but I could not get the "parallel/run" (percentile_cont is not supported and also won't change the concept of having a single machine with a table partitioned for that specific case as you have to pass through the query planner.