2

I am reading multiple files from an HDFS directory, and for each file the generated data is printed using:

frequencies.foreach(x => println(x._1 + ": "+x._2))

And the printed data is (for File1.txt):

'text': 45
'data': 100
'push': 150

The key can be different for other files like (File2.txt):

'data': 45
'lea': 100
'jmp': 150

The key is not necessarily the same in all the files. I want all the file data to be written to a .csv file in the following format:

Filename   text  data  push  lea  jmp
File1.txt  45    100   150   0    0
File2.txt  0     45    0     100  150  ....

Can someone please help me find a solution to this problem?

2 Answers 2

1

If your files doesn't big enough, you can done without spark. Here is my example code, csv format is old style, doesn't like your expected output, but you can tweak it easily.

  import scala.io.Source
  import org.apache.hadoop.fs._
  val sparkSession =   ...  // I created it to retrieve hadoop configuration, you can create your own Configuration.
  val inputPath =   ...
  val outputPath =   ...

  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  // read all files content to Array of Map[String,String]
  val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
    .map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
                    .map(_.split(":").map(_.trim))
                    .filter(_.length == 2)
                    .map(p => (p.head, p.last)).toMap))
  // create default Map with all possible keys
  val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
  val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
    .map(s => (s._1, s._2.values.mkString(",")))
    .map(s => s"${s._1},${s._2}")
    .mkString("\n")
  val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
  val csv = csvHeader + "\n" + csvContent

  new PrintWriter(fs.create(new Path(outputPath))){
    write(csv)
    close()
  }
Sign up to request clarification or add additional context in comments.

Comments

0

I'd suggest creating one dataframe for all the files inside your directory and then using a pivot to re-shape the data accordingly :

val df1 = sc.parallelize(Array(
("text",45  ),
("data",100 ),
("push",150 ))).toDF("key", "value").withColumn("Filename", lit("File1") )

val df2 = sc.parallelize(Array(
("data",45  ),
("lea",100 ),
("jump",150 ))).toDF("key", "value").withColumn("Filename", lit("File2") )

val df = df1.unionAll(df2)

df.show
+----+-----+--------+
| key|value|Filename|
+----+-----+--------+
|text|   45|   File1|
|data|  100|   File1|
|push|  150|   File1|
|data|   45|   File2|
| lea|  100|   File2|
|jump|  150|   File2|
+----+-----+--------+


val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value") ).na.fill(0)

finalDf.show
+--------+----+----+---+----+----+
|Filename|data|jump|lea|push|text|
+--------+----+----+---+----+----+
|   File1| 100|   0|  0| 150|  45|
|   File2|  45| 150|100|   0|   0|
+--------+----+----+---+----+----+

You can write it as a CSV using DataFrameWriter

df.write.csv(..)

The hard part with this would be creating a different dataframe for each file with an extra column for the Filename from which the dataframe is created

5 Comments

I am not able to write the finalDf to csv using df.write.csv and getting this error: value csv is not a member of org.apache.spark.sql.DataFrameWriter . Thanks @philantrovert
If you're using Spark 1.6, you'll need to add the databricks csv jar to your application. And the code will change to df.write.format("com.databricks.spark.csv"). More info here : github.com/databricks/spark-csv
Sorry but i've searched alot about it and didn't find a way to make jar file from this github repo and how to add it in application. It would be great help if u mention these steps here please.. Thanks @philatrovert
I've figured it out.. Thanks @philantrovert
When i am running this code on 11,00 files (Dataset text files), i am getting an error java.lang.StackOverFlowError. I am calling a function for each file and the Union action is performed with the previous dataframes and this will continue for each file. Would you please suggest what should be done to avoid this error with no decrease in time?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.