1

I have a pyspark dataframe in which the important information is stored in a column as json strings that have a similar, but inconsistent schema. Three questions arise from my issue, walked through below:

  1. To flatten a dataframe with a json string column, should one create a new column of structs and use explode
  2. Do UDFs pass individual cell values to the function they wraps?
  3. How can I load json arrays of varying length and fields per entry into a single column?

This is happening in both databricks and a local install of pyspark.

A MWA of this table can be generated by this code:

from pyspark.sql.functions import from_json
from pyspark.sql.types import *

jstring_A = """[
{"a_id":"0001","a_s":"apple","score":1},
{"a_id":"0002","a_s":"banana","score":1},
{"a_id":"0003","a_s":"carrot","score":1}
]""".replace('\n','')
# This has 3 responses, each of the fields ["a_id", "a_s", "score"]

jstring_B = """[
{"a_id":"so1","a_R":"aardvark","score":5},
{"a_id":"so2","a_R":"baboon","score":9}
]""".replace('\n','')
# This has 2 responses, each of the fields ["a_id", "a_R", "score"]

data = [(1, jstring_A), (2, jstring_B)]
columns = ["_oid", "json_str"]
source_df = spark.createDataFrame(data=data, schema = columns)

which generates this table

+----+---------------------------------------------------------------------------------------------------------------------------+
|_oid|json_str                                                                                                                   |
+----+---------------------------------------------------------------------------------------------------------------------------+
|1   |[{"a_id":"0001","a_s":"apple","score":1},{"a_id":"0002","a_s":"banana","score":1},{"a_id":"0003","a_s":"carrot","score":1}]|
|2   |[{"a_id":"so1","a_R":"aardvark","score":5},{"a_id":"so2","a_R":"baboon","score":9}]                                        |
+----+---------------------------------------------------------------------------------------------------------------------------+

my ultimate goal is to flatten this dataframe. My understanding of the best way to do this is to convert the string to a struct inside the dataframe and then use explode. This would create 5 rows (one per response), each with columns _oid, json_str, a_id, a_s, a_R, score.

Is this process correct?

In an effort to flatten, I found this excellent question which provided the way to get all the field names in a schema. This question explained that any schema fields missing values would simply be loaded as Null.

This produces the following code

all_fields = spark.read.json(source_df.select("json_str").rdd.map(lambda x: x[0])).schema

and resulting schema

StructType(List(StructField(a_R,StringType,true),StructField(a_id,StringType,true),StructField(a_s,StringType,true),StructField(score,LongType,true)))

At this point, our questions diverge, as there are a variable number of responses in each json object here, so we cannot simply use that schema. Because the schemas for the column are not constant, we therefore cannot pass from_json a column. However, it can be passed a string.

For this, I resorted to using .withColumn and udf.

import json as pj

def create_struct(json_str):
  n_responses = len(pj.loads(json_str))
  schema = StructType(
              [StructField(
                            name = f"{i}",
                            dataType= all_fields,
                            nullable= True
                          )
               for i in range(n_responses)]
  )
  return(from_json(json_str,schema))
  
json_as_struct = udf(lambda z: create_struct(z))

new_df = source_df.withColumn("as_struct", json_as_struct(col("json_str")))

per my understanding of UDFs, spark will distribute the iteration here and create the column as_struct cell by cell by passing the corresponding value of json_str to the UDF. Is this how UDFs work?

The first sign of error from this code is new_df. Before evaluation its schema is

DataFrame[_oid: bigint, json_str: string, as_struct: string]

as one would expect given the default return type on UDFs is string.

When an action call (new_df.display()) is made the real error occurs:

PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:

Full traceback is below.

This leads to my last question: Can I use a UDF to transform these json strings to structs, and if so what changed should I make to my code for it to function?


Full traceback:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2033.0 failed 4 times, most recent failure: Lost task 2.3 in stage 2033.0 (TID 32875, 10.60.162.7, executor 167): org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 654, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 646, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
    for item in iterator:
  File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
    return f(*args, **kwargs)
  File "<command-124...>", line 19, in <lambda>
  File "<command-124...>", line 17, in create_struct
  File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2339)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:298)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:58)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2994)
    at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:2985)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3709)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3707)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2984)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.generateTableResult(PythonDriverLocal.scala:1158)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:1070)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:939)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:538)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:899)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$8(PythonDriverLocal.scala:384)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:371)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 654, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 646, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
    for item in iterator:
  File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
    return f(*args, **kwargs)
  File "<command-124...>", line 19, in <lambda>
  File "<command-124...>", line 17, in create_struct
  File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

1 Answer 1

0

A senior dev came up with the following solution for me:

  1. No, one can use regexp_replace in conjunction with functools.reduce to explode directly from the json string.
  2. Did not discuss
  3. The following code achieves the desired results:
def explode_col(source_df, colname):
  
  keys = spark.read.json(source_df.select(colname).rdd.map(lambda x: x[0])).columns
  
  source_df = source_df\
    .withColumn("json_trimmed", f.regexp_replace(f.col(colname),'\[|\]', ''))\
    .withColumn("json_trimmed", f.regexp_replace(f.col("json_trimmed"),'\"', ''))\
    .withColumn("json_rep", f.regexp_replace(f.col("json_trimmed"), '\},\{', '\}.\{'))\
    .withColumn("json_array", f.split(f.col("json_rep"), '\.'))\
    .withColumn('exploded', f.explode('json_array'))
  
  pat = "(?<=%s:)\w+(?=(,|}))"
  from functools import reduce
  exploded_col_df = reduce(
      lambda df, c: df.withColumn(
          c,
          f.when(
              f.col("exploded").rlike(pat%c),
              f.regexp_extract("exploded", pat%c, 0)
          )
      ),
      keys,
      source_df
  )
  return(exploded_col_df.drop(colname, "json_trimmed", "json_rep", "json_array", "exploded"))
                
explode_col(source_df, "json_str").show(truncate = False)
Sign up to request clarification or add additional context in comments.

1 Comment

I don't think you should listen to your "senior dev"... I would strongly advise against parsing JSON using a regex... Ideally, you should parse the JSON into a struct (inferring the schema or specifying it) and then explode it. If you really want to do it "dynamically", you'd better use Spark JSON functions.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.