1

Input:

caseid object_value
1 [{'dummyAcc':'12346','accountRequest':{'schemeCode':'ZEROQ1', 'CCZ':'SGD'}}]
2 [{'dummyAcc':'12347','accountRequest':{'schemeCode':'ZEROQ2', 'CCZ':'SGD'}}]
3 [{'dummyAcc':'12348','accountRequest':{'schemeCode':'ZEROQ5', 'CCZ':'SGD'}}]
4 [{'dummyAcc':'12349','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]
5 [{'dummyAcc':'12350','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]

Output:

caseid schemeCode CCZ
1 ZEROQ1 SGD
2 ZEROQ2 SGD
3 ZEROQ5 SGD
4 ZEROQ SGD
5 ZEROQ SGD

Kindly guide me achieving this output in spark, I am able to do this in python using a small sample data, but need to do this in spark due to data volume in production. Thanks in Advance

5
  • If the examples you gave here are representative of your use case, the fastest approach is to use PySpark's native regexp_extract tool. If your data is very complex you may need to write a UDF that casts them into dictionaries and searches them. Commented Jul 15, 2021 at 13:43
  • will regexp_extract work on JSON? Commented Jul 15, 2021 at 13:51
  • Sure. JSONs are just strings. Here's an example of using it to find the first word following a specific search word: stackoverflow.com/a/46547701/1884171 Commented Jul 15, 2021 at 14:01
  • 1
    the problem is that your data is not a valid JSON. Valid JSON use double quotes Commented Jul 15, 2021 at 14:23
  • @Steven, sorry my bad the data is in double quotes. Commented Jul 16, 2021 at 3:21

3 Answers 3

3

You might use get_json_object, its straightforward

import pyspark.sql.functions as f

df = spark.createDataFrame([
  [1, """[{'dummyAcc':'12346','accountRequest':{'schemeCode':'ZEROQ1', 'CCZ':'SGD'}}]"""],
  [2, """[{'dummyAcc':'12347','accountRequest':{'schemeCode':'ZEROQ2', 'CCZ':'SGD'}}]"""],
  [3, """[{'dummyAcc':'12348','accountRequest':{'schemeCode':'ZEROQ5', 'CCZ':'SGD'}}]"""],
  [4, """[{'dummyAcc':'12349','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]"""],
  [5, """[{'dummyAcc':'12350','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]"""]
], schema='caseid int, object_value string')

final_df = (df
            .select('caseid', 
                    f.get_json_object('object_value', '$[*].accountRequest.schemeCode').alias('schemeCode'),
                    f.get_json_object('object_value', '$[*].accountRequest.CCZ').alias('CCZ')))

final_df.show(truncate=False)
# +------+----------+-----+
# |caseid|schemeCode|CCZ  |
# +------+----------+-----+
# |1     |"ZEROQ1"  |"SGD"|
# |2     |"ZEROQ2"  |"SGD"|
# |3     |"ZEROQ5"  |"SGD"|
# |4     |"ZEROQ"   |"SGD"|
# |5     |"ZEROQ"   |"SGD"|
# +------+----------+-----+
Sign up to request clarification or add additional context in comments.

Comments

2

So a coworker once told me that regex_extract is faster than parsing the JSONs and I've always believed that... until today when I decided to run some timing experiments comparing it the two other solutions posted here using get_json_object and from_json.

The short answer is that all perform comparably, even when we complicate the JSONs by adding thousands of extra K:V pairs. The regex_extract method is actually consistently a bit slower in these tests.

Setup: proving each method works

import pyspark.sql.functions as fun
import pyspark.sql.types as t

case_ids = range(1,6)
data =  [
  '{"dummyAcc":"12346","accountRequest":{"schemeCode":"ZEROQ1", "CCZ":"SGD"}}',
  '{"dummyAcc":"12347","accountRequest":{"schemeCode":"ZEROQ2", "CCZ":"SGD"}}',
  '{"dummyAcc":"12348","accountRequest":{"schemeCode":"ZEROQ5", "CCZ":"SGD"}}',
  '{"dummyAcc":"12349","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}',
  '{"dummyAcc":"12350","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}'
]

df = spark.createDataFrame(pd.DataFrame({"caseid": case_ids, "object_value": data}))

##
# fun.from_json
##
schm = t.StructType(
    [
        t.StructField("dummyAcc", t.StringType()),
        t.StructField(
            "accountRequest",
            t.StructType(
                [
                    t.StructField("schemeCode", t.StringType()),
                    t.StructField("CCZ", t.StringType()),
                ]
            ),
        ),
    ]
)

def run_from_json(df):
  return df.withColumn("object_value", fun.from_json("object_value", schm, options={"allowSingleQuotes": "true"}))\
          .select(
            "caseid",
            "object_value.accountRequest.schemeCode",
            "object_value.accountRequest.CCZ",
        )

##
# get_json
##

def run_get_json(df):
  return df.select('caseid', 
                    fun.get_json_object('object_value', '$.accountRequest.schemeCode').alias('schemeCode'),
                    fun.get_json_object('object_value', '$.accountRequest.CCZ').alias('CCZ'))


##
# regexp_extract
##

def run_regexp_extract(df):
  return df.withColumn("schemeCode", fun.regexp_extract(fun.col("object_value"), '(.)("schemeCode":")(\w+)', 3))\
    .withColumn("CCZ", fun.regexp_extract(fun.col("object_value"), '(.)("CCZ":")(\w+)', 3))\
    .select("caseid", "schemeCode", "CCZ")

##
# Test them out
##

print("from_json")
run_from_json(df).show(truncate=False)

print("get_json")
run_get_json(df).show(truncate=False)

print("regexp_extract")
run_regexp_extract(df).show(truncate=False)


from_json
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+

get_json
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+


regexp_extract
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+

Timing Part 1 -- Using Short JSONs I checked the wall clock time of running multiple iterations using the default compact JSONs defined above.

def time_run_method(df, n_it, meth, meth_name):
  t0 = time.time()
  for i in range(n_it):
    meth(df).count()
  td = time.time() - t0
  print(n)
  print("Time to count %d iterations: %s [sec]" % (n_it, "{:,}".format(td)))
  
for m, n in zip([run_from_json, run_get_json, run_regexp_extract], ["from_json", "get_json", "regexp_extract"]):
  time_run_method(df, 200, m, n)


from_json
Time to count 200 iterations: 15.918861389160156 [sec]

get_json
Time to count 200 iterations: 15.668830871582031 [sec]

regexp_extract
Time to count 200 iterations: 17.539576292037964 [sec]

Timing Part 2 -- Using Long JSONs I added two thousand key-value pairs to the JSONs to see if the extra overhead of deserializing them would change things. It did not. Perhaps this structure is too simple and the internal parsers are able to simply avoid the extra keys or they just don't present a lot of overhead given how flat the structure is. I don't know.

cruft = json.dumps({k:v for k,v in enumerate(range(2000))})

data = [
  '{ "cruft": %s, "dummyAcc":"12346","accountRequest":{"schemeCode":"ZEROQ1", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12347","accountRequest":{"schemeCode":"ZEROQ2", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12348","accountRequest":{"schemeCode":"ZEROQ5", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12349","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12350","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}' % cruft
]

df2 = spark.createDataFrame(pd.DataFrame({"caseid": case_ids, "object_value": data}))

for m, n in zip([run_from_json, run_get_json, run_regexp_extract], ["from_json", "get_json", "regexp_extract"]):
  time_run_method(df2, 200, m, n)


    
from_json
Time to count 200 iterations: 16.005220413208008 [sec]
get_json
Time to count 200 iterations: 15.788024187088013 [sec]
regexp_extract
Time to count 200 iterations: 16.81353187561035 [sec]

8 Comments

Amazing answer, I have never stopped to think about which has the best performance
It just takes one slow day to exercise some curiosity :-)
Hi @andrew, thank you so much for this wonderful explanation, learnt something today. I will go with regex_extract as there are 500 k:V, I just took a sample and posted here.
1. Have 2 question what is the 3 at the end of the regex_extract functions? 2. what if the scheme_code is like "schemeCode":"ZEROQ1 account (ABCD)" how will the regex change?
The 3 is the group number. regexp_extract captures all the groups specified in the regex. In this case we want the third group captured.
|
1

To extract json-like data, use the function from_json. It requires a schema as input. And your JSON is malformarted, therefore, you need to add the option {"allowSingleQuotes": "true"}.

from pyspark.sql import functions as F, types as T

schm = T.StructType(
    [
        T.StructField("dummyAcc", T.StringType()),
        T.StructField(
            "accountRequest",
            T.StructType(
                [
                    T.StructField("schemeCode", T.StringType()),
                    T.StructField("CCZ", T.StringType()),
                ]
            ),
        ),
    ]
)

df.withColumn(
    "object_value",
    F.from_json("object_value", schm, options={"allowSingleQuotes": "true"}),
).select(
    "caseid",
    "object_value.accountRequest.schemeCode",
    "object_value.accountRequest.CCZ",
).show()

+------+----------+---+                                                         
|caseid|schemeCode|CCZ|
+------+----------+---+
|     1|    ZEROQ1|SGD|
|     2|    ZEROQ2|SGD|
|     3|    ZEROQ5|SGD|
|     4|     ZEROQ|SGD|
|     5|     ZEROQ|SGD|
+------+----------+---+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.