4

I need to write a spark dataframe to Postgres DB . I have used the following

df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties) 

This works fine however, I want to compare the performance with 'Copy' command

Tried the following

output = io.StringIO() 

 csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)

output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2 
con_bb.commit() 

This doesnot seem to work with error 'type' object is not iterable

worked well with Pandas dataframe

output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")  
con_bb.commit()

Any leads on how to implement the Pandas equivalent in Pyspark. P.S: Its performance critical hence converting to spark df to Pandas df is not an option. Any help would be greatly appreciated

3 Answers 3

3

What currently works very well for me (100-200GB of csv fies with around 1.000.000.000 rows) is using psycopg2 together with multiprocessing

Available cores: 200

First I export the spark dataframe in multiple files that are multibles of the available cores

filepath="/base_path/psql_multiprocessing_data"

df.repartition(400) \
    .write \
    .mode("overwrite") \
    .format("csv") \ # even faster using binary format, but ok with csv
    .save(filepath,header='false')

Then I iterate in parallel over all files in the folder via

import glob
import psycopg2   
from multiprocessing import Pool, cpu_count

file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))

def psql_copy_load(fileName):
    con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
    cursor = con.cursor()
    with open(fileName, 'r') as f:
        # next(f)  # in case to skip the header row.
        cursor.copy_from(f, 'my_schema.my_table', sep=",")
    
    con.commit()
    con.close()
    return (fileName)
    

with Pool(cpu_count()) as p:
        p.map(psql_copy_load,file_path_list)

print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))

I did not further try to export the data as binary because it got complicated with the correct heades and data types and I was happy with the run time of around 25-30 Minutes (with 6 columns)

Sign up to request clarification or add additional context in comments.

Comments

0

To my knowledge, Spark does not provide a way to use the copy command internally.

If you want to load postgres from hdfs you might be interested in Sqoop. It allows to export a csv stored on hdfs. Moreover, it is able to produce multiple copy statement. In my experiments, adding 4 mappers speeds up the ingesting by factor 2 versus only one mapper. This should be way faster than using the spark jdbc way of doing.

Here are the steps:

  1. df.write.csv("my/hdfs/folder")
  2. sqoop export --connect "jdbc:postgresql://postgres_host/postgres_db" --username --password-file file:///home/$USER/.password --export-dir my_csv_table --table -m 4 --direct --lines-terminated-by '\n' --fields-terminated-by ',' -- --schema

Comments

0

you can try the Postgres extension aws_s3 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.S3Import.html

bucket = "my-bucket"
key = "my/s3/key.csv.gz"

df.write.mode("overwrite").csv(f"s3a://{bucket}/{key}", compression="gzip")
-- in postgres

aws_s3.table_import_from_s3 (
   table_name text, 
   column_list text, 
   options text, 
   s3_info aws_commons._s3_uri_1
) 

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.