1

I have to work with a file format where each row is a json object. For example:

{'Attribute 1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}
{'Attribute 1': 'B', 'Attribute 2': 2.0, 'Attribute 3': ['A'], 'Attribute 4': {'A': 4}}
{'Attribute 1': 'C', 'Attribute 2': 1.7, 'Attribute 3': ['A','C'], 'Attribute 4': {'A': 3}}

Note that this is not a valid json file format as it is not enclosed in an array. Also, the actual structures are far larger and more nested. These files are distributed in s3. I've only used parquet or csv before, so I'm not sure how to read these files.

I'm currently writing a process to join this data with several other tables, and as the data is large and located in s3 I'm using pyspark.sql in an emr cluster to do the operations. I can create a table with a single column containing the objects as strings using:

from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
sqlContext = SQLContext(sc)

schema = StructType([
    StructField('json_format', StringType())
])

context = sqlContext.read
context = context.schema(schema)

df = context.load(
    folder_path,
    format='com.databricks.spark.csv',
    delimiter=','
)
df.createOrReplaceTempView('my_table')

How can I transform this column into a dictionary where I can access the various attributes? Is there an equivalent of a lambda function?

1 Answer 1

1

To make valid json object we can replace all ' to " then using get_json_object() function we can access the attributes.

Example:

df=sqlContext.sql("""select string("{'Attribute1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}") as str""")

#replacing ' with " using regexp_replace
df=df.withColumn("str",regexp_replace(col("str"),"\'","\""))
df.show(10,False)

#+----------------------------------------------------------------------------------------------+
#|str                                                                                           |
#+----------------------------------------------------------------------------------------------+
#|{"Attribute1": "A", "Attribute 2": 1.5, "Attribute 3": ["A","B","C"], "Attribute 4": {"A": 5}}|
#+----------------------------------------------------------------------------------------------+

#registering temp table
df.registerTempTable("tt")

#accessing Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3') from tt").show()
#+-------------+
#|          _c0|
#+-------------+
#|["A","B","C"]|
#+-------------+

#accessing first array element from Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3[0]') from tt").show()
#+---+
#|_c0|
#+---+
#|  A|
#+---+

#accessing Attribute 2
sqlContext.sql("select get_json_object(str,'$.Attribute 2') from tt").show()
#+---+
#|_c0|
#+---+
#|1.5|
#+---+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.