3

I'm using spark with scala.

I have a Dataframe with 3 columns: ID,Time,RawHexdata. I have a user defined function which takes RawHexData and expands it into X more columns. It is important to state that for each row X is the same (the columns do not vary). However, before I receive the first data, I do not know what the columns are. But once I have the head, I can deduce it.

I would like a second Dataframe with said columns: Id,Time,RawHexData,NewCol1,...,NewCol3.

The "Easiest" method I can think of to do this is: 1. deserialize each row into json (every data tyoe is serializable here) 2. add my new columns, 3. deserialize a new dataframe from the altered json,

However, that seems like a waste, as it involves 2 costly and redundant json serialization steps. I am looking for a cleaner pattern.

Using case-classes, seems like a bad idea, because I don't know the number of columns, or the column names in advance.

4
  • 1
    Could you provide more details? An example of data contained in RawHexdata maybe. Commented Sep 15, 2015 at 8:05
  • you can always apply .withColumn() function only after some conditions have been satisfied Commented Sep 15, 2015 at 8:26
  • Rawhexdata is a giant binary blob sent in by a bunch of embedded devices. I contains data which will be deserialized into other flat numeric data: doubles, ints, complex numbers and such. I would later like to enable an analyst to query this data with Sparksql. However, when the data is in a blob this is not possible, So I have written a UDF "parseblob" which takes a blob and returns a map/json object (i can change the return type to suit the soulution). I would like the contents of this map to be the columns in another table, where each row is related to the orginal raw data. Commented Sep 15, 2015 at 8:29
  • @niemand, withcolumn allows one cloumn at a time. Is there anyway I can use withcolumn without reparsing the entire blob for every column I add? (say I want to add 3 columns for example). If so, I could easily add a function that adds multiple columns by calling withcolumn repeadtely, However, Every synthax of withcolumn that I can think of requires reparsing raw data multiple times per row. I'm not very experienced with Scala, maybe there is some way... Commented Sep 15, 2015 at 9:11

2 Answers 2

2

What you can do to dynamically extend your DataFrame is to operate on the row RDD which you can obtain by calling dataFrame.rdd. Having a Row instance, you can access the RawHexdata column and parse the contained data. By adding the newly parsed columns to the resulting Row, you've almost solved your problem. The only thing necessary to convert a RDD[Row] back into a DataFrame is to generate the schema data for your new columns. You can do this by collecting a single RawHexdata value on your driver and then extracting the column types.

The following code illustrates this approach.

object App {

  case class Person(name: String, age: Int)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
    val dataFrame = input.df

    dataFrame.show()

    // create the extended rows RDD
    val rowRDD = dataFrame.rdd.map{
      row =>
        val blob = row(1).asInstanceOf[Int]
        val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
        Row.fromSeq(row.toSeq.init ++ newColumns)
    }

    val schema = dataFrame.schema

    // we know that the new columns are all integers
    val newColumns = StructType{
      Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
    }

    val newSchema = StructType(schema.init ++ newColumns)

    val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)

    newDataFrame.show()
  }
}
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks, though I don't know the type of each specific numerical value . I can add a "switch" and build the Seq function
Exactly @eshalev, assuming that all your RawHexdata contain the same columns, you can collect one RawHexdata object and calculate the data types for the resulting columns.
2

SELECT is your friend solving it without going back to RDD.

case class Entry(Id: String, Time: Long)

val entries = Seq(
  Entry("x1", 100L),
  Entry("x2", 200L)
)

val newColumns = Seq("NC1", "NC2", "NC3")

val df = spark.createDataFrame(entries)
  .select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*)

df.show(false)

+---+----+----+----+----+
|Id |Time|NC1 |NC2 |NC3 |
+---+----+----+----+----+
|x1 |100 |null|null|null|
|x2 |200 |null|null|null|
+---+----+----+----+----+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.