1

I want to split the JSON format column results in a Spark dataframe:

allrules_internal table in Hive :

----------------------------------------------------------------
|tablename  |                 condition            | filter     |
|---------------------------------------------------------------|
| documents | {"col_list":"document_id,comments"}  | NA         |
| person    | {"per_list":"person_id, name, age"}  | NA         |
 ---------------------------------------------------------------

Code:

val allrulesDF = spark.read.table("default" + "." + "allrules_internal")
allrulesDF.show()

val df1 = allrulesDF.select(allrulesDF.col("tablename"), allrulesDF.col("condition"), allrulesDF.col("filter"), allrulesDF.col("dbname")).collect()

Here I want to split the condition column values. From the example above, I want to keep the "document_id, comments" part. In other words, the condition column have a key/value pair but I only want the value part.

If more than one row in allrules_internal table how to split the value.

  df1.foreach(row => { 
     //   condition = row.getAs("condition").toString() // here how to retrive ?
       println(condition)
       val tableConditionDF = spark.sql("SELECT "+ condition + " FROM " + db_name + "." + table_name)
       tableConditionDF.show()
 })
2
  • Will the condition column always have a single key-value pair and will the key always be col_list? Maybe you can add a couple of more rows to the input table in the question. Commented Oct 10, 2018 at 8:14
  • edited question. Commented Oct 10, 2018 at 11:38

1 Answer 1

1

You can use the from_jsonfunction:

import org.apache.spark.sql.functions._
import spark.implicits._

allrulesDF
  .withColumn("condition", from_json($"condition", StructType(Seq(StructField("col_list", DataTypes.StringType, true)))))
  .select($"tablename", $"condition.col_list".as("condition"))

It will print:

+---------+---------------------+
|tablename|condition            |
+---------+---------------------+
|documents|document_id, comments|
+---------+---------------------+

Explanation:

With the withColumn method, you can create a new column by using a function combining one or more columns. In this case, we're using the from_json function, which receives the column that contains a JSON String, and a StructType object, with the schema of the JSON string represented in the column. Finally, you just have to select the columns you that need.

Hope it helped!

Sign up to request clarification or add additional context in comments.

2 Comments

If I have more than one row in the allrules_internal table how to add? df1.foreach(row => { condition = allrulesDF .withColumn(row.getAs("condition"), from_json($"condition", StructType(Seq(StructField("col_list", DataTypes.StringType, true))))) .select($"condition.col_list".as("condition")) // condition = row.getAs("condition").toString() println(condition) val tableConditionDF = spark.sql("SELECT "+ condition + " FROM " + db_name + "." + table_name) tableConditionDF.show() })
edited question

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.