6

There was a question regarding this issue here:

Explode (transpose?) multiple columns in Spark SQL table

Suppose that we have extra columns as below:

**userId    someString      varA     varB      varC    varD**
   1        "example1"    [0,2,5]   [1,2,9]    [a,b,c] [red,green,yellow]
   2        "example2"    [1,20,5]  [9,null,6] [d,e,f] [white,black,cyan]

To conclude an output like below:

userId    someString      varA     varB   varC     varD
   1      "example1"       0         1     a       red
   1      "example1"       2         2     b       green
   1      "example1"       5         9     c       yellow
   2      "example2"       1         9     d       white
   2      "example2"       20       null   e       black
   2      "example2"       5         6     f       Cyan

The answer was by defining a udf as:

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

and defining "withColumn".

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

If we need to extend the above answer, with more columns, what is the easiest way to amend the above code. Any help please.

3 Answers 3

3

The approach with the zip udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:

def assertSameSize(arrs:Seq[_]*) = {
 assert(arrs.map(_.size).distinct.size==1,"sizes differ") 
}

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
    assertSameSize(xa,xb,xc,xd)
    xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
  }
)
Sign up to request clarification or add additional context in comments.

Comments

3

I am assuming that the size of varA,varB,varC,varD remains same from your example.

scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input

scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result

scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)

scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)

scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]

scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString|      varA|        varB|     varC|                varD|
+-------+----------+----------+------------+---------+--------------------+
|      1|  example1| [0, 2, 5]|   [1, 2, 9]|[a, b, c]|[red, green, yellow]|
|      2|  example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+

scala> def getResult(row : Input) : Iterable[Result] = {
     |             val user_id = row.user_id
     |             val someString = row.someString
     |             val varA = row.varA
     |             val varB = row.varB
     |             val varC = row.varC
     |             val varD = row.varD
     |             val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
     |             seq.toSeq
     |         }
getResult: (row: Input)Iterable[Result]

scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]

scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC|  varD|
+-------+----------+----+----+----+------+
|      1|  example1|   0|   1|   a|   red|
|      1|  example1|   2|   2|   b| green|
|      1|  example1|   5|   9|   c|yellow|
|      2|  example2|   1|   9|   d| white|
|      2|  example2|  20|null|   e| black|
|      2|  example2|   5|   6|   f|  cyan|
+-------+----------+----+----+----+------+

If the size of columns varA,varB,varC or varD is different then those scenarios need to be handles.

You could iterate over the max size and output null values if values are not present in any columns by handling exceptions.

2 Comments

Thanks Ankush, What if the data was read from an external source such as hdfs. How can we fill-up the array created in case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String]).
That becomes a separate question. What kind of data you load? What format it is in(csv/json)? You need to reformat the data.
0

If you want to extend the UDF for more columns, do as below:

val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) =>
  for (((xs,ys),zs) <- xs zip ys zip zs) yield (xs,ys,zs))

df.withColumn("vars", explode(zip($"varA", $"varB", $"varC"))).select(
  $"userId", $"someString", $"vars._1".alias("varA"),
  $"vars._2".alias("varB"),$"vars._3".alias("varC")).show

This logic can be applied for n columns as required.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.