12

I'm trying to append data to my csv file using df.write.csv. This is what I did after following spark document http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter:

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append'

Executing the above code gives me error:

NameError: name 'append' not defined

Without append, error:

The path already exists.

4
  • are there a file called by sqlcsvA.csv? Commented Dec 19, 2016 at 8:31
  • Yes the output is copied to sqlcsvA.csv file. Commented Dec 19, 2016 at 8:58
  • can you delete and again create this file from code ? Commented Dec 19, 2016 at 9:14
  • Are you asking to add a delete option in code and then each time program runs a new file should be created? Commented Dec 19, 2016 at 10:04

3 Answers 3

16
df.write.save(path='csv', format='csv', mode='append', sep='\t')
Sign up to request clarification or add additional context in comments.

3 Comments

This again splits output into different files. It's gets partitioned.
Include .coalesce(1) before write, it will prevent partitioning, not sure if the result will be appended though! df.coalesce(1).write.save(path='csv', format='csv', mode='append', sep='\t')
Thanks. That got everything to one file.
3

From the docs: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter Since v1.4

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

e.g.

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

If you want to write a single file, you can use coalesce or repartition on either of those lines. It doesn't matter which line, because the dataframe is just a DAG execution , no execution happens until the write to csv. repartition & coalesce effectively use the same code, but coalesce can only reduce the number of partitions where repartition can also increase them. I'd just stick to repartition for simplicity.

e.g.

df1 = sqlContext.createDataFrame(query1).repartition(1)

or

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

I think the examples in the docs aren't great, they don't show examples of using parameters other than the path.

Referring to the two things you tried:

(append)

For that to work, there would need to be a string variable named append containing the value "append". There's no string constant in the DataFrameWriter library called append. i.e. you could add this earlier in your code, and it would then work. append = "append"

('mode=append')

For that to work, the csv method would have to parse out the mode=append string to get the value for the mode, which would be extra work when you can just have a parameter with exactly the value "append" or "overwrite" that needs to be extracted. None is a special case, Python built in, not specific to pyspark.

On another note, I recommend using named parameters where possible. e.g.

csv(path="/path/to/file.csv", mode="append")

instead of positional parameters

csv("/path/to/file.csv", "append")

It's clearer, and helps comprehension.

Comments

1

I do not about Python, but in Scala and Java one can set the the save mode in the following way:

df.write.mode("append").csv("pathToFile")

I assume that it should be similar in Python. This may be helpful.

4 Comments

I tried what you said in python. But each line of my output is copied into separate csv files in one folder called sqlcsvA.csv. They are not copied into one single csv file.
@kaks, it seems like you will have to merge those files manually. Take a look at this question. For instance, people are using FileUtil.copyMerge in Java.
@kaks, note that if you read the results back (in Spark), those files are merged and you have a DataFrame that contains the data from all files in that directory.
Don't need to merge them manually, just use .repartition(1) when writing. When you read the files back in to a dataframe, it doesn't technically merge them, because the dataframe is distributed in the cluster. Each file will be the basis of a dataframe partition. So in a sense you do have one dataframe, but it is still in many underlying pieces.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.