1

There is the following dataframe:

  >>> df.printSchema()
  root
   |-- I: string (nullable = true)
   |-- F: string (nullable = true)
   |-- D: string (nullable = true)
   |-- T: string (nullable = true)
   |-- S: string (nullable = true)
   |-- P: string (nullable = true)

column F is in dictionary format:

   {"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04",...}

I need to read column F as following and create two new columns P and N

   P1 => "1:0.01"
   P2 => "3:0.03,4:0.04"
   and so on

 +--------+--------+-----------------+-----+------+--------+----+
 | I      |  P     | N               |  D  | T    | S      | P  |
 +--------+--------+---------------- +------------+--------+----+
 | i1     |  p1    | 1:0.01          |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i1     |  p2    | 3:0.03,4:0.04   |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i1     |  p3    | 3:0.03,4:0.04   |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i2     |  ...   | ....            |  d2 | t2   | s2     | p2 |
 +--------+--------+-----------------+-----+------+--------+----+

any suggestion in Pyspark?

4
  • 1
    Possible duplicate of Pyspark: explode json in column to multiple columns Commented Sep 4, 2019 at 18:32
  • Thanks for your comment. In the provided link the dictionary only has two keys. Here the dictionary has many keys. Commented Sep 4, 2019 at 19:38
  • For this answer, in the link, the schema is provided, which is known beforehand. But in my case the schema cannot be provided. Here it only has two keys, key1 and key2. Mine might have key1, key2, ...., key128 and not a fixed number. How do you provide a flexible schema? Commented Sep 4, 2019 at 19:46
  • Also I don't want to add new columns per key BUT new rows. It is more of an "explode". Commented Sep 4, 2019 at 19:48

2 Answers 2

2

Try this:

  1. The DataFrame you have
from pyspark.sql import functions as F

df = spark.createDataFrame([('id01', '{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}')], ['I', 'F'])
df.printSchema()
df.show(truncate=False)

You can see the schema and data are the same in your post.

root
 |-- I: string (nullable = true)
 |-- F: string (nullable = true)

+----+---------------------------------------------------------+
|I   |F                                                        |
+----+---------------------------------------------------------+
|id01|{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}|
+----+---------------------------------------------------------+

  1. Process the string to distinguish sub-dicts
# remove '{' and '}'
df = df.withColumn('array', F.regexp_replace('F', r'\{', ''))
df = df.withColumn('array', F.regexp_replace('array', r'\}', ''))

# replace the comma with '#' between each sub-dict so we can split on them
df = df.withColumn('array', F.regexp_replace('array', '","', '"#"' ))
df = df.withColumn('array', F.split('array', '#'))
df.show(truncate=False)

Here's the middle results

+----+---------------------------------------------------------+-----------------------------------------------------------+
|I   |F                                                        |array                                                      |
+----+---------------------------------------------------------+-----------------------------------------------------------+
|id01|{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}|["P1":"1:0.01", "P2":"3:0.03,4:0.04", "P3":"3:0.03,4:0.04"]|
+----+---------------------------------------------------------+-----------------------------------------------------------+

  1. Now generate one row for each sub-dict
# generate one row for each element int he array
df = df.withColumn('exploded', F.explode(df['array']))

# Need to distinguish ':' in the dict and in the value
df = df.withColumn('exploded', F.regexp_replace('exploded', '":"', '"#"' ))
df = df.withColumn('exploded', F.split('exploded', '#'))

# extract the name and value
df = df.withColumn('P', F.col('exploded')[0])
df = df.withColumn('N', F.col('exploded')[1])
df.select('I', 'exploded', 'P', 'N').show(truncate=False)

The final output:

+----+-----------------------+----+---------------+
|I   |exploded               |P   |N              |
+----+-----------------------+----+---------------+
|id01|["P1", "1:0.01"]       |"P1"|"1:0.01"       |
|id01|["P2", "3:0.03,4:0.04"]|"P2"|"3:0.03,4:0.04"|
|id01|["P3", "3:0.03,4:0.04"]|"P3"|"3:0.03,4:0.04"|
+----+-----------------------+----+---------------+
Sign up to request clarification or add additional context in comments.

12 Comments

Thanks for the answer. It is very similar to what I already put as the answer.
it doesn't use udf, which should be faster in general.
what if F.col('exploded') is NULL and F.col('exploded')[0] and F.col('exploded')[1] are not valid?
Such special cases can be handled with regexp_replace etc. udf needs to do the similar things as well.
If there's a NULL in the array, explode puts the NULL together with "P3". So this: ["P1":"1:0.01", "P2":"3:0.03,4:0.04", "P3":"3:0.03,4:0.04", null] becomes "P3"|"3:0.03,4:0.04", null. You'll have to handle the extra null there.
|
0

This is how I solved this at the end:

 #This method replaces "," with ";" to 
 #distinguish between other camas in the string to split it
 def _comma_replacement(val):
    if (val):
        val = val.replace('","', '";"').replace('{','').replace('}', '')
    return val

replacing = UserDefinedFunction(lambda x: _comma_replacement(x))
new_df = df.withColumn("F", replacing(col("F")))
new_df = new_df.withColumn("F",split(col("F"),";").cast(ArrayType(StringType())))
exploded_df = new_df.withColumn("F", explode("F"))
df_sep = exploded_df.withColumn("F",split(col("F"),'":"').cast(ArrayType(StringType())))
dff = df_sep.withColumn("P", df_sep["F"].getItem(0))
dff_new = dff.withColumn("N", dff["F"].getItem(1))
dff_new = dff_new.drop('F')

Using another UDF, i removed the extra characters that remained during the string manipulation.

The above solution also uses the same way. The key idea is to distinguish between commas between different components and inside them. For that, I suggested the _comma_replacement(val) method which be called in a UDF. The above solution also utilizes the same method but using regxp_replace that can be more optimized.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.