I am trying to solve a simple log parsing problem: given N log files logfile.20160601.txt logfile.29169692.txt ... I would save a parquet file with the date key of the log.
In order to accomplish that I found this way in order to get the inputSplit path.
val data = sc.textFile("/logdirectory/*.*")
val logsWithFileName = data.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
val file = inputSplit.asInstanceOf[FileSplit]
val logDateKey = getDatekeyFromPath(file)
iterator.map { tpl => ( logDateKey, tpl._2.toString) }
}
val logs = logsWithFileName.map(item => LogItem(item._1,item._2))
val df = logs.toDF
Now I try to save the dataframe
df.write.partitionBy("logdatekey", "hostname").save("C:/temp/results.parquet")
but I receive this message
Output directory file:/C:/temp/results.parquet already exists
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/C:/temp/results.parquet already exists
at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)
Does anyone experimented this strange behavior? Could this be related to the use of input split?
Many thanks in adavance Rob