1

I'm using scala 2.12 spark 3.0.0. I have to create one single json string where each attribute comes from different tables and save that resulting json in another dataframe, i.e (using 1 row only in this example to keep it simple)

tableA

id action date
u1 insert 20210428

tableB

id name date
u1 some name 20210428

I need to return the following :

{
    "A": [ 
    {
      "id":"u1",
      "action": "insert",
      "date": "20210428"
    }
    ],
    "B": [
    {
      "id":"u1",
      "name": "some name"
      "date": "20210428",
    } 
   ]
}  

I've tried many things but the closest i've gotten is doing the following for each table:

val tableADF = spark.read.format("delta").load(path +"/tableA")
val tableADF = spark.read.format("delta").load(path +"/tableB")

create the dataframe with all vaules converted to json for each table

val tableAJsonDF = tableADF.groupBy("date").agg(collect_list(struct($"id",$"action")).alias("attributesA"))
val tableBJsonDF = tableBDF.groupBy("date").agg(collect_list(struct($"id",$"name")).alias("attributesB"))
date attributesA
20210428 [{"id":"u1", "action": "insert"}]
date attributesB
20210428 [{"id":"u1", "name": "some name"}]

Now combine the json from both tables into one json to be added to a new dataframe:

val schema = new StructType().add("request", StringType)
val requestDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val resultDF = requestDF.withColumn("request", concat(to_json(tableAJsonDF("attributesA")), 
                                                      to_json(tableBJsonDF("attributesB"))))

but I get the following error. I read that this type of error happens when you try to combine two dataframes but I can't seem to find a way to create 1 single json as shown in the desired results by combining both attributes into 1 new column, any ideas?

org.apache.spark.sql.AnalysisException: Resolved attribute(s) attributesA#4726,attributesB#4783 missing from request#24790 in operator !Project [concat(to_json(attributesA#4726, Some(EST)), to_json(attributesB#4783, Some(EST))) AS request#24792].;;

1 Answer 1

1

You need to join the dataframes. e.g.

val t1 = tableADF.select(
    col("id"), 
    array(struct(tableADF.columns.map(col):_*)).as("A")
)

val t2 = tableBDF.select(
    col("id"), 
    array(struct(tableBDF.columns.map(col):_*)).as("B")
)

val result = t1.join(t2, Seq("id")).select(to_json(struct("A", "B")).as("result"))

result.show(false)
+--------------------------------------------------------------------------------------------------------------+
|result                                                                                                        |
+--------------------------------------------------------------------------------------------------------------+
|{"A":[{"id":"u1","action":"insert","date":"20210428"}],"B":[{"id":"u1","name":"some name","date":"20210428"}]}|
+--------------------------------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.