2

I have a dataframe with schema

root
|-- properties: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- value: string (nullable = true)

An example of 3 rows from the df appears as:

[]

[{"key":"id", "value":"as143"},
  {"key":"user", "value":"John Doe"},
  {"key":"email", "value":"[email protected]"},
  {"key":"location", "value":"KY, USA"}]

[{"key":"id", "value":"bd143"},
  {"key":"user", "value":"Adam Smith"},
  {"key":"email", "value":"[email protected]"}]

So each new user is in a new row in the df. The amount of data in each row can be different with null values being allowed.

I would like to make a new df such that each key would be the new column name and the values in the column would be all the corresponding values of that key. For example, I would have a column titled 'user' and each row would have a different user name.

I have tried to access the sub-fields using this code but I get an error.

keys = table.select('properties.key').distinct().collect()[0][0]
table.withColumn('value', split(table.properties.value, ',')).
    select(col('value'[0].alias(keys[0]),
    select(col('value'[1].alias(keys[1]),
    select(col('value'[2].alias(keys[2]),
    select(col('value'[3].alias(keys[3])).display()

I have also tried to create a key, value map because I need to do these for other columns in the dataframe and for some of them, it is hard to tell have many key/value pairs there are since they are allowed to be null values so I may use pyspark.sql.functions.from_json.

I feel this is the preferred way, however, I have not had success with that either. I have not been able to convert to a map and I think it is because rather than having "key":"value" pairs I have "key": "key 1", "value": "value 1", "key": "key 2", "value": "value 2", "key": "key 3", "value": "value 3", etc all in the same row.

This is the code I used:

table.withColumn('properties', from_json(table.properties, MapType(StringType(), StringType())))

and I get the error:

cannot resolve 'entries' due to data type mismatch: argument 1 requires string type, however 'table.properties' is of array<struct<key:string, value:string>> type.

I am not exactly sure how to go about converting an array type like this to a map.

2
  • Can the array properties contain multiple users? If so, can you show example? I think this can be done using some pivot after exploding the array but It's not clear on how you can have same key. Commented Jan 22, 2022 at 10:36
  • I have edited my question so it shows 3 rows. Each row in the actual data frame has anywhere from 0 to 4 key:value pairs. table.properties.key gives me a list of my keys and table.properties.value gives a list of the corresponding values. Commented Jan 22, 2022 at 18:28

1 Answer 1

1

Assuming this is your input dataframe:

data = '[{"properties":[]},{"properties":[{"key":"id","value":"as143"},{"key":"user","value":"John Doe"},{"key":"email","value":"[email protected]"},{"key":"location","value":"KY, USA"}]},{"properties":[{"key":"id","value":"bd143"},{"key":"user","value":"Adam Smith"},{"key":"email","value":"[email protected]"}]}]'
table = spark.read.json(spark.sparkContext.parallelize([data]))

Your column properties is of type array of structs, that's why you get data mismatch error when you try to use from_json function. You can simply explode expand the array to get 2 columns key and value like this:

table.selectExpr("inline(properties)").show()

#+--------+-----------------+
#|     key|            value|
#+--------+-----------------+
#|      id|            as143|
#|    user|         John Doe|
#|   email|  [email protected]|
#|location|          KY, USA|
#|      id|            bd143|
#|    user|       Adam Smith|
#|   email|[email protected]|
#+--------+-----------------+

That said, now what you want is to pivot the column key. In order to use pivot, we need some column ID on which to groupby, if you don't have this column, you can assign a row_id using monotonically_increasing_id function before exploding the array:

from pyspark.sql import functions as F

result = table.withColumn("row_id", F.monotonically_increasing_id()) \
    .selectExpr("row_id", "inline(properties)") \
    .groupBy("row_id").pivot("key").agg(F.first("value")).drop("row_id")

result.show()

#+-----------------+-----+--------+----------+
#|            email|   id|location|      user|
#+-----------------+-----+--------+----------+
#|  [email protected]|as143| KY, USA|  John Doe|
#|[email protected]|bd143|    null|Adam Smith|
#+-----------------+-----+--------+----------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you so much! I just couldn't figure this out before. I really appreciate it!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.