0

I am trying to solve a simple log parsing problem: given N log files logfile.20160601.txt logfile.29169692.txt ... I would save a parquet file with the date key of the log.

In order to accomplish that I found this way in order to get the inputSplit path.

val data = sc.textFile("/logdirectory/*.*")
val logsWithFileName = data.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
      val file = inputSplit.asInstanceOf[FileSplit]
      val logDateKey = getDatekeyFromPath(file)

      iterator.map { tpl => ( logDateKey, tpl._2.toString) }
      }
val logs = logsWithFileName.map(item => LogItem(item._1,item._2))
val df = logs.toDF

Now I try to save the dataframe

df.write.partitionBy("logdatekey", "hostname").save("C:/temp/results.parquet")

but I receive this message

Output directory file:/C:/temp/results.parquet already exists
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/C:/temp/results.parquet already exists
    at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)

Does anyone experimented this strange behavior? Could this be related to the use of input split?

Many thanks in adavance Rob

2
  • Have you deleted all the files that are in C:/temp/results.parquet ? Commented Jun 17, 2016 at 8:08
  • Yes, I did this in a test suite cleaning all the content before running test Commented Jun 17, 2016 at 8:23

1 Answer 1

1

Well you error message says allo. You are trying to write an output that already exist.

You just need to what are the available save operations :

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing a Overwrite, the data will be deleted before writing out the new data.

  • SaveMode.ErrorIfExists (behavior by default) when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

  • SaveMode.Append - when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

  • SaveMode.Overwrite - means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

  • SaveMode.Ignore - means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

So if your case, if you want to overwrite your existing data, you should do the following :

import org.apache.spark.sql.SaveMode

df.write.partitionBy("logdatekey", "hostname").mode(SaveMode.Overwrite).save("C:/temp/results.parquet")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.