5

Im using Spark 2.4.3 and Scala 2.11

Below is my current JSON string in a DataFrame column. Im trying to store the schema of this JSON string in another column using schema_of_json function. But its throwing below the error. How could I resolve this?

{
  "company": {
    "companyId": "123",
    "companyName": "ABC"
  },
  "customer": {
    "customerDetails": {
      "customerId": "CUST-100",
      "customerName": "CUST-AAA",
      "status": "ACTIVE",
      "phone": {
        "phoneDetails": {
          "home": {
            "phoneno": "666-777-9999"
          },
          "mobile": {
            "phoneno": "333-444-5555"
          }
        }
      }
    },
    "address": {
      "loc": "NORTH",
      "adressDetails": [
        {
          "street": "BBB",
          "city": "YYYYY",
          "province": "AB",
          "country": "US"
        },
        {
          "street": "UUU",
          "city": "GGGGG",
          "province": "NB",
          "country": "US"
        }
      ]
    }
  }
}

Code:

val df = spark.read.textFile("./src/main/resources/json/company.txt")
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"company":{"companyId":"123","companyName":"ABC"},"customer":{"customerDetails":{"customerId":"CUST-100","customerName":"CUST-AAA","status":"ACTIVE","phone":{"phoneDetails":{"home":{"phoneno":"666-777-9999"},"mobile":{"phoneno":"333-444-5555"}}}},"address":{"loc":"NORTH","adressDetails":[{"street":"BBB","city":"YYYYY","province":"AB","country":"US"},{"street":"UUU","city":"GGGGG","province":"NB","country":"US"}]}}}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


df.withColumn("jsonSchema",schema_of_json(col("value")))

Error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [value#0, schemaofjson(value#0) AS jsonSchema#10]
+- Project [value#0]
   +- Relation[value#0] text

3 Answers 3

5

The workaround solution that I found was to pass the column value as below to the schema_of_json function.

df.withColumn("jsonSchema",schema_of_json(df.select(col("value")).first.getString(0)))

Courtesy:

Implicit schema discovery on a JSON-formatted Spark DataFrame column

Sign up to request clarification or add additional context in comments.

1 Comment

For pyspark it would be df.withColumn("jsonSchema",schema_of_json(df.select(col("value")).first()[0]))
1

Since SPARK-24709 was introduced schema_of_json accepts just literal strings. You can extract schema of String in DDL format by calling

spark.read
  .json(df.select("value").as[String])
  .schema
  .toDDL

1 Comment

Thanks, in my case, NOT every row has the same json string with same schema. How do I handle that with this?
0

If one is looking for a pyspark answer :

import pyspark.sql.functions as F
import pyspark.sql.types as T
import json
    
  def process(json_content):
      if json_content is None : 
        return []
      try:
        # Parse the content of the json, extract the keys only
        keys = json.loads(json_content).keys()
        return list(keys)
      except Exception as e:
        return [e]
    
    udf_function = F.udf(process_file, T.ArrayType(T.StringType()))
    my_df = my_df.withColumn("schema", udf_function(F.col("json_raw"))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.