0

I need query 200+ tables in database. By using spark.sql = f"" select ... " statement i get col(0) (because result of the query give me specific information about column that i've retrive) and result of calculation for particulare table, like this:

col(0)
1

My goal is to have 1 csv file, with name of table and the result of calculation:

Table name Count
accounting 3
sales 1

So far my main part of my code:

list_tables = ['accounting', 'sales',...]

for table in list_tables:
  df = spark.sql(
     f""" select distinct errors as counts from {database}.{table} where errors is not null""")

  df.repartition(1).write.mode("append").option("header","true").csv(f"s3:.......)
  rename_part_file(dir,output,newdir)

I'm kinda new to PySpark and all structures included. Soo far i'm confused because i heard iteration dataframe isn't best idea.

By using following code i get only 1 csv with last recent record, not all processed tables from my list_tables. Im stuck, don't know if there is possibility to pack all of it into 1 dataframe, or i should union dataframe?

1 Answer 1

1

Im stuck, don't know if there is possibility to pack all of it into 1 dataframe, or i should union dataframe?

Both of the options you mentioned lead to the same thing - you have to iterate over a list of tables (you can't read multiple tables at once), read each of it, execute a SQL statement and save the results into DataFrame, then union all of the DataFrames and save as a single CSV file. The sample code could look something like this:

from pyspark.sql.functions import lit
from functools import reduce

tables = ["tableA", "tableB", "tableC"]
dfs = []
for table in tables:
    dfs.append(spark.read.table(table).sql("my sql statement").withColumn("TableName", lit(table))) # Append the DF with SQL query results

df = reduce(lambda df1, df2: df1.union(df2), dfs) # Union all DFs
df.coalesce(1).write.mode("overwrite").csv("my_csv.csv") # Combine and write as single file

Note: the union operation takes into account only the position of the column, and not its name. I assume for your case that is the desired behaviour, as your are only extracting a single statistic.

Sign up to request clarification or add additional context in comments.

1 Comment

That 2 lines was something that i was looking for! Idk why but i had to use "import pyspark.sql import DataFrame df = reduce(DataFrame.unionAll, dfs), otherwise i got incorrect syntax error sparksql. Dzięki!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.