2

I would really love some help with parsing nested JSON data using PySpark-SQL. The data has the following schema (blank spaces are edits for confidentiality purposes...)

Schema

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- longitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)

My Goal I would essentially want to get the data into the following data frame

restaurant_id | latitude | longtitude | timezone 

I have tried

dfj = spark.read.option("multiLine", False).json("/file/path")

result = dfj.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )

# SQL operation
result.createOrReplaceTempView('result')

subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp  
FROM result

'''
).show()  

# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
          .values()
          .flatMap(lambda x: x
                   .replace("{", "#!#")
                   .split("#!#")))

But oddly enough it gives me the following only for the first object or restaurant id

+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+

My research indicated that this may have something to do with the way JSON files are structured at the source. For example:

{}{
}{
}

Thereby not being multi-Line or something. Wondering what to do about this as well?

Thank you very much for reading, any help would really be appreciated. I know I can always count on SO to be helpful

1
  • 1
    If you can share the json for more one restaurant ( scrub anything which is critical) , it will be really helpful . I think you meant timestamp=timezone ., correct ? Commented Mar 20, 2020 at 18:36

2 Answers 2

1

I was able to solve this by reading the JSON file I've described above as follows, hope it helps! :

# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")                              
          .values()
          .flatMap(lambda x: x
                   .replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))


# explode here to have restaurant_id, and nested data
exploded_source_df_1 =  source_df_1.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )


# Via SQL operation : this will solve the problem for parsing 
exploded_source_df_1.createOrReplaceTempView('result_1')

subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timestamp 
from result_1
'''
).persist()
Sign up to request clarification or add additional context in comments.

Comments

1

The spark.read.json() reader assumes one json object per text line. I'm not sure I follow the insertion of the \n and then the split... sounds like maybe the file is malformed?

Perhaps there is a record separator such as a \r which you can't see. The linux command od -c <file name> |head -10 will help show what the characters are in between records.

If the schema is well known, then supply that schema object, this will reduce the first pass which does schema inferencing. E.g. schema.read.schema(schema).json('path to directory') and definitely make your read operation much faster. Save the objects as parquet or delta lake format for better performance you need to query it later.

Databricks' COPY INTO or cloudFiles format will speed the ingestion/reduce the latency. https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html

2 Comments

Hi Mr. Moore, thank you for your response. (1) "I'm not sure I follow the insertion... malformed?" --> the JSON file here is actually a stream file. The file name is like data-stream-1-2020-05-28-14-00-24-34567fg-5tgh-56yh-tyhn-45tghnmj so the only way I can read it is as a JSON. I also don't see any characters between the different records I am trying to separate, I tried the Linux command (2) I am already mounting my databricks to S3 using sc._jsc.hadoopConfiguration().set(aws_key, aws_key_id) so I assume the copy will achieve the same effect?
If you're on Databricks (the question is tagged databricks), COPY INTO will track which files have been ingested, simplifying the ingestion process, ensuring files are not ingested multiple times. If you're using Open Source Spark, then COPY INTO isn't available, you'll have to deal with S3 and 'eventual consistency' type issues.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.