0

I parse Spark dataframe using file paths but now I would like to add paths to the resulting dataframe along with time as a separate column too. Here is a current solution (pathToDF is a helper method):

val paths = pathsDF
  .orderBy($"time")
  .select($"path")
  .as[String]
  .collect()

if(paths.nonEmpty) {
  paths
    .grouped(groupsNum.getOrElse(paths.length))
    .map(_.map(pathToDF).reduceLeft(_ union _))
} else {
  Seq.empty[DataFrame]
}

I am trying to do something like this but I am not sure how to add time column too using withColumn:

    val orderedPaths = pathsDF
      .orderBy($"time")
      .select($"path")
   //.select($"path", $"time") for both columns

    val paths = orderedPaths
      .as[String]
      .collect()

    if (paths.nonEmpty) {
      paths
        .grouped(groupsNum.getOrElse(paths.length))
        .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
          .withColumn("path", orderedPaths("path")))
    //.withColumn("time", orderedPaths("time") something like this
    } else {
      Seq.empty[DataFrame]
    }

What would be a better way to implement it?

Input DF:

time Long
path String

Current result:

resultDF schema
field1 Int
field2 String
....
fieldN String

Expected result:

resultDF schema
field1 Int
field2 String
....
path   String
time   Long
12
  • can you show schema of pathsDF ?? & Can you add what is your input & expected output ?? if possible Commented May 29, 2020 at 9:43
  • @Srinivas I have added expected result and input Commented May 29, 2020 at 9:55
  • join will work if both are dataframes ? Commented May 29, 2020 at 9:56
  • do you wanted to control files read from the path? I mean read only x files out of y, if not you can directly load all the files, no? Commented May 29, 2020 at 9:59
  • looks like you need to extend pathToDataDF to consume time as well as path, and use lit function to encode these literals as columns in resulted dataframe Commented May 29, 2020 at 10:05

1 Answer 1

1

Please check below code.

1. Change grouped to par function for parallel data load.

2. Change

// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
     .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
     .withColumn("path", orderedPaths("path"))) 

to

// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
    group.map(path => {
        pathToDataDF(path).withColumn("path", lit(path)) 
        }
    )
})
.reduceLeft(_ union _)

For example I have used both par & grouped to show you.

Note Ignore some of method like pathToDataDF I have tried to replicate your methods.

scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]

scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame

//Sample File content I have taken.

scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}

scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}

scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}

Using par

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val parDF = paths match {
        case p if !p.isEmpty => {
            p.par
            .map(path => { 
                pathToDataDF(path)
                .withColumn("path",lit(path))
            }).reduceLeft(_ union _)
        }
        case _ => spark.emptyDataFrame
    }
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> parDF.show(false)
+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// With time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val parDF = paths match {
            case p if !p.isEmpty => {
                p.par
                .map(path => {
                    pathToDataDF(path._1)
                    .withColumn("path",lit(path._1))
                    .withColumn("time",lit(path._2))
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

Using grouped

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => { 
                        pathToDataDF(path)
                        .withColumn("path", lit(path))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> groupedDF.show(false)

+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// with time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => {
                        pathToDataDF(path._1)
                        .withColumn("path",lit(path._1))
                        .withColumn("time",lit(path._2))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }


groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.