0

I have a CSV file (~1gb, 10 million rows). I want to parse it in batches and then write the output to the new text/CSV file, where each line would be a JSON array.

val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))

Source
  .fromFile(fileName)
  .getLines()
  .grouped(batchSize)
  .foreach(chunk => {
    val jsonArray = doChunkTransformation(chunk)

    bw.write(jsonArray)
  })

bw.close()

Is that approach efficient? Or maybe I should wrap it with Future? I'm quite fresh with Scala, so maybe I'm not aware of all methods and solutions?

Important Note
Unfortunately, I'm limited and I cannot use any external library, so the solution has to be written in pure Scala.

3
  • In which sense this should be sufficient? Why Future? I really do not understand why some people is so reluctant to adding a external library, this wold be easily solved using something like fs2 or akka-sreams. Commented Jan 22, 2021 at 13:17
  • 1
    @LuisMiguelMejíaSuárez we are using closed internal maven repository, where each library has to be verified. Commented Jan 22, 2021 at 13:22
  • @Forin I feel your pain. Commented Jan 22, 2021 at 14:04

1 Answer 1

2

The only optimization i can think of - parallelized doChunkTransformation execution, offcourse if it's relatively expensive operation, otherwise it does not make sens because IO will take more. For example:

        implicit val ec = scala.concurrent.ExecutionContext.global
        val paralelism = Runtime.getRuntime().availableProcessors() * 2

        val outputFile = new File(outputFileName)
        val bw = new BufferedWriter(new FileWriter(file))
        val timeout = 

        Source
        .fromFile(fileName)
        .getLines()
        .grouped(batchSize * paralelism)
        .foreach(chunk => {
            //Run computations in paralel.
            // Note parallelism level depends on exact `ExecutionContext` implementation 
            // In this example it will be equal to amount of processors multiplied by 2
            // Future.traverse preserve order
            val computations = Future.traverse(chunk.grouped(paralelism)) (smallChunk => Future(doChunkTransformation(smallChunk))) 
            val chunks = Await.result(computations).map(_.flatten), 1 minute)
            bw.write(jsonArray)
        })

        bw.close()

UPD: I did not find any mentions about preserving original order in Future.traverse in docs: https://www.scala-lang.org/api/current/scala/concurrent/Future.html But I've prepared example that shows that order remains the same: https://scastie.scala-lang.org/DUvwX1CXTl2kxrcez3uDgw

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

import scala.language.postfixOps

val random = new Random()
val computes = Future.traverse((1 to 10).toList) { num: Int => 
  val delay = random.nextInt(1000)
  println(s"Delay is $delay")
  Thread.sleep(delay)
  Future(num.toString)
}
println(Await.result(computes, 1 minute))

prints out at the end : List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Sign up to request clarification or add additional context in comments.

10 Comments

The batchSize is defined by the user. Is val jsonArray in correct place? I'm asking, becasue it looks like it is not visible later, in write. Can you add short description to the body of foreach? And what about the order - I assume it is not kept?
@Forin I don't know signature of doChunkTransformation - this code above is example. If flatten operation not found, probably because of return type. Could you specify it, please?
@Forin No no you need to have smallerChunk => Future(doChunkTransformation(smallerChunk)) - doChunkTransformation does not need to return future, leave it as is.
The order in which operations are executed is unknown but the collection is always collected in order (which makes the execution slower btw).
@LuisMiguelMejíaSuárez I have noticed one error - in Future.traverse(chunk.grouped(...) it sshould be batchSize not paralelism. Otherwise we are getting batchSize-length iterator with paralelism-length Seq.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.