3

I am brand new to spark (hours) and additionally rather inexperienced with Scala. However, I have long standing desire to become more familiar with both.

I have a rather trivial taks. I have two dataframes that I am importing from two JSON files. One with an uuid,text,tag_ids and the other with the tags id,term I would like to produce a new json file that I can import into solr that contains the uuid,text,tag_ids,tag_terms.

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")



text.printSchema()

root
| -- uuid: string (nullable = true)
| -- tag_ids: array (nullable = true)
|    | -- element: string (contiansNull = true)
| -- text: string (nullable = true)

tags.printSchema()
root
| -- id: string (nullable = true)
| -- term: string (nullable = true)


#desired output  
+--------------------+------+---------+------------+
|                uuid| text | tag_ids |   tag_terms|  
+--------------------+------+---------+------------+    
|cf5c1f4c-96e6-4ca...| foo  |    [1,2]| [tag1,tag2]|      
|c9834e2e-0f04-486...| bar  |    [2,3]| [tag2,tag3]|   
+--------------------+--------------+--------------+

It is difficult to show all I have been trying. Essentially .join() is having issues with tag_ids being an array. I can explode() tag_ids and join on tag_terms but reassembling it into a new df to export is still beyond my level.

2 Answers 2

4

Solution using explode:

val result = text
  .withColumn("tag_id", explode($"tag_ids"))
  .join(tags,  $"tag_id" === $"id")
  .groupBy("uuid", "tag_ids")
  .agg(first("text") as "text", collect_list("term") as "tag_terms")
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks This worked. In my actual data I had a few more columns, and some rows had null values or no actual tag_ids. This was easily solved with a left join.
1

Try this :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

import spark.implicits._

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")

 val df1 = spark.sparkContext.parallelize(text, 4).toDF()
 val df2 = spark.sparkContext.parallelize(tags, 4).toDF()

 df1.createOrReplaceTempView("A")
 df2.createOrReplaceTempView("B")


spark.sql("select d1.key,d1.value,d2.value1  from A d1  inner join B d2 on d1.key = d2.key").show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.