0

I have two DataFrames, DF1 and DF2, and a JSON file which I need to use as a mapping file to create another dataframe (DF3).

DF1:

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|    100|   John| Mumbai|
|    101|   Alex|  Delhi|
|    104|  Divas|Kolkata|
|    108|  Jerry|Chennai|
+-------+-------+-------+

DF2:

+-------+-----------+-------+
|column4|    column5|column6|
+-------+-----------+-------+
|     S1|        New|    xxx|
|     S2|        Old|    yyy|
|     S5|replacement|    zzz|
|    S10|        New|    ppp|
+-------+-----------+-------+

Apart from this one mapping file I am having in JSON format which will be use to generate DF3.

Below is the JSON mapping file:

{"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"}
{"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"}
{"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"}

So from this JSON file I need to create DF3 with a column available in the targetColumn section of the mapping and it will check the source column if it is present in DF1 then it map to sourceField1 from DF1 otherwise sourceField2 from DF2.

Below is the expected output.

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Divas|replacement|       zzz|
|     Jerry|        New|       ppp|
+----------+-----------+----------+

Any help here will be appropriated.

5
  • 2
    what have you tried Divas? Commented Jul 24, 2018 at 9:02
  • 2
    org.apache.spark.sql -> from_json + schema, all build in Commented Jul 24, 2018 at 9:53
  • @Pavel Can you give one example how I can implement this in my code? as I am new in spark Commented Jul 24, 2018 at 9:54
  • ds.withColumn("my_json_column", from_json( col("my_json_column"), validJsonSchema) Commented Jul 24, 2018 at 10:01
  • @Pavel This is not working. Can you explain what is my_json_column here Commented Jul 24, 2018 at 10:10

1 Answer 1

1

Parse the JSON and create the below List of custom objects

case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))

Add dummy index column to both the dataframes and join both the dataframes based on index column

import org.apache.spark.sql.functions._

val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))

Create the query and execute it.

val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show

Sample Output:

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Jerry|        New|       ppp|
|     Divas|replacement|       zzz|
+----------+-----------+----------+

Hope this approach will help you

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for providing this awesome solution but my requirement is to do in Scala instead of SparkSQl. So can you please help me here to do same thing using Scala only?
Hi @DivasNikhra , I modified my solution for dataframes only
Thank you so much for providing this solution. This is exactly I was looking :)
Can you please help me in this question? stackoverflow.com/questions/51928103/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.