0

I am new to scala and I am trying to implement a code for first of all reading list of files in a folder and then loading each one of those CSV files in HDFS.

As of now I am iterating through all CSV files using for loop, but I want to implement this using multithreading such that each thread takes care of each file and perform end to end process on respective file.

My current implementation is:

val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

for(file<-fileArray){
    //  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

    //      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

    //  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}

I was trying to look into Future API of scala but was not able to understand it's usage properly as of now.

Any pointers on how Future API of scala could help me here would be a great help.

Regards, Bhupesh

4
  • Could you please post signatures (and, if possible), implementations of loadCSV2DF and writeDF2HDFS? With Futures the basic approach would look like fileArray.map(file => loadCSV2DF(...)).flatMap(df => writeDF2HDFS(...)), but loadCSV2DF and writeDF2HDFS must return futures in that case. Commented Jul 13, 2017 at 13:05
  • loadCSV2DF() and writeDF2HDFS() just provide basic implementation of reading file using spark and storing it to a data frame and finally saving/writing that data frame to HDFS using save() of df. Could you please provide me some basic implementation? When you say these methods should return Future, I am not sure how and what it will do?? Is there any reference doc which is easy to understand and where I could see such examples?? Commented Jul 13, 2017 at 15:11
  • I haven't used Spark myself, so my "traditional Scala" approach seems to not be applicable - see [Using Futures within Spark] (stackoverflow.com/questions/37478871/using-futures-within-spark) question. So it doesn't seem futures would be helpful in this case. Commented Jul 16, 2017 at 4:09
  • Thanks for responding @J0HN Commented Jul 16, 2017 at 12:37

2 Answers 2

1

You can split the processing of each file into multiple threads by converting the array of files into a parallel collection with the par method:

for(file<-fileArray.par){
  // code here executed concurrently across multiple threads
}

It's still up to you to combine the results in a thread safe manner though.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for responding @Simon, does that mean that if I write .par with in the loop then each file will be assigned to one thread for performing end to end implementation defined within the block? I may try this once I will sit in front of my code. In the mean time could you please let me know does adding .par will not add thread safety???
0

What about putting all code in the body of your for loop in a function and amend the for loop? Let's say you first convert your fileArray to a list of strings with filenames. Then,

import java.io.File
val fileArrayNames: Array[String] = new File(".").listFiles.map(x=> x.getName)
def function(filename: String): Unit = {
    val df = loadCSV2DF(sqlContext, fileFormat, "true", "true", filename.getName)
    val finalDestination: String = destination+filename.getName
    writeDF2HDFS(df,fileFormat,"true",finalDestination)
}
fileArrayNames.foreach(file=> function(file))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.