3

I have a data-frame with 4 fields as mentioned below :

Field1 , Field2 , Field3 , Field4

I have values in the fields as below :

A1      , B1         , C1         , D1                
A2      , B2,B3      , C2,C3      , D2,D3             
A1      , B4,B5,B6   , C4,C5,C6   , D4,D5,D6          

I have to convert it into the below format :

A1      , B1         , C1         , D1          
A2      , B2         , C2         , D2            
A2      , B3         , C3         , D3      
A1      , B4         , C4         , D4      
A1      , B5         , C5         , D5      
A1      , B6         , C6         , D6      

Basically I have to split the comma separated values in multiple columns and form new rows based on the values in the same order.

You can consider all of them as of type String. Can you suggest me a way to do this splitting and forming new rows based on the new values.

I could see already a question similar to this as the below one:

How to flatmap a nested Dataframe in Spark

But this question is different as I have to consider splitting multiple columns in this case and the values should not repeat.

1
  • The duplicate question marked is different from this . I have multiple columns here to split at the same time. Commented Jul 14, 2016 at 22:17

2 Answers 2

3

You can convert DataFrame to Dataset[(String, String, String, String)] and flatMap:

import scala.util.Try

val df = Seq(
  ("A1", "B1", "C1", "D1"),
  ("A2", "B2,B3", "C2,C3", "D2,D3"),
  ("A1", "B4,B5,B6", "C4,C5,C6", "D4,D5,D6")
).toDF("x1", "x2", "x3", "x4")

// A simple sequence of expressions which allows us to flatten the results
val exprs = (0 until df.columns.size).map(i => $"value".getItem(i))

df.select($"x1", array($"x2", $"x3", $"x4")).as[(String, Seq[String])].flatMap {
  case (x1, xs) => 
    Try(xs.map(_.split(",")).transpose).map(_.map("x" +: _)).getOrElse(Seq())
}.toDF.select(exprs:_*)

// +--------+--------+--------+--------+
// |value[0]|value[1]|value[2]|value[3]|
// +--------+--------+--------+--------+
// |      A1|      B1|      C1|      D1|
// |      A2|      B2|      C2|      D2|
// |      A2|      B3|      C3|      D3|
// |      A1|      B4|      C4|      D4|
// |      A1|      B5|      C5|      D5|
// |      A1|      B6|      C6|      D6|
// +--------+--------+--------+--------+

or use an UDF:

val splitRow = udf((xs: Seq[String]) => 
   Try(xs.map(_.split(",")).transpose).toOption)

// Same as before but we exclude the first column
val exprs = (0 until df.columns.size - 1).map(i => $"xs".getItem(i))

df
  .withColumn("xs", explode(splitRow(array($"x2", $"x3", $"x4"))))
  .select($"x1" +: exprs: _*)
Sign up to request clarification or add additional context in comments.

3 Comments

Great! Your solution is nice and works for the given input. But if i change the input as below : val df = Seq( ("A2", "B2,B3", "C2,C3", "D2,D3"), ).toDF("x1", "x2", "x3", "x4") it is failing and giving error in transpose. Can you provide me a general solution.
Can you provide a traceback? All strings have to be of the same size to make it work.
Awsome! It works so fluently. Hope you enjoyed answering it . ;)
0

You can use posexplode to solve this quickly. Refer http://allabouthadoop.net/hive-lateral-view-explode-vs-posexplode/ So, your code will be like below :

select 
    Field1, 
    Field2,
    Field3,
    Field4 
from temp_table
lateral view posexplode(Field2) pn as f2_1,f2_2, Field2
lateral view posexplode(Field3) pn as f3_1,f3_2, Field3 
lateral view posexplode(Field3) pn as f4_1,f4_2, Field4
where 
    (f2_1 == F3_1 and f3_1 == f4_1) and/or (f2_2 == F3_2 and f3_2 == f4_2)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.