6

I have a json-like structure in spark which looks as follows:

>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

depending on the partition, some-array might be an empty array for all id's. When this happend spark infers the following schema:

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string

Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem

>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

So far so good, but when I try to actually collect the data I get an error:

>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group

I don't understand why this fails. There should be no incompatibility cause by the schema. In case you wonder, collecting the data without specifying the schema works.

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works

Edit

Here there's a reproducible example in python

from pyspark.sql.types import *


myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])

path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema

read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!

2 Answers 2

6

The solution I've found is setting the option dropFieldIfAllNull to True when reading the json file. This causes field with empty array to disapear, making merging schema easier.

>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string

Now, undesired type inference won't apply and when reading multiple partitions of the same file the option mergeSchema will be able to read all files without colision.

Sign up to request clarification or add additional context in comments.

3 Comments

@AlexandrosBiratsis Thanks!. sorry for accepting your anwser and then step back :/
Nothing to care about since you found what you were looking for. Although I am not sure if the schema will be automatically merged by spark and extended to the one containing all the required information. Did you try it out?
@AlexandrosBiratsis I did try out! When reading you need to set mergeSchema option. This wouldn't work before because schemas were incompatible, but with this aproach missing columns will be set to null.
1

Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem

I am afraid that there is no such a schema which can parse simultaneously these two cases. The data {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]} can be parsed only with:

good_schema = StructType([
  StructField('id', StringType()), 
  StructField( 'some-array', 
              ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())
              ])
          ))
  ])

when {"id": "OK", "some-array": []} with:

bad_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', ArrayType(StringType()))
])

Therefore one option is to read these two directories with different schemas.

I don't understand why this fails. There should be no incompatibility cause by the schema.

As explained above the data is incompatible with the schema.

In case you wonder, collecting the data without specifying the schema works.

This is the expected behaviour since when there is no explicit schema specified Spark will try to discover it.

Suggested solution

The only solution that I can think of is to treat some-array field as a string. I don't know if this feasible in your system although you could implement it by casting the some-array into string for both schemas/partitions.

This conversion can be done with at least two options given:

good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
  1. Read both datasets and convert the some-array field to string then save the results under one common directory with:
good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))

good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
  1. Execute the above conversion in runtime skipping the re-write step.

Finally you can load some-array as a string and then convert it to array_schema using from_json function:

from pyspark.sql.types import *
from pyspark.sql.functions import from_json

array_schema = ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())]))

# we will use this for both partitions
generic_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', StringType())
])

parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"

# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)

final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))

final_df.show(10, False)

# +---+------------------------+
# |id |some-array              |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[]                      |
# +---+------------------------+

final_df.printSchema()
# root
#  |-- id: string (nullable = true)
#  |-- some-array: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- array-field-1: string (nullable = true)
#  |    |    |-- array-field-2: string (nullable = true)

5 Comments

Thanks for the answer. Nevertheless I still don't understand why {"id": "OK", "some-array": []} is parsed as StructField('some-array', ArrayType(StringType())). I'd expect that spark wouldn't make any assumptions to the type of the elements.
This is a good question. Let me answer with an assumption that when an empty array is found Spark will convert it to array[string] by default.
@Ismor I believe though that the reason for that is that ArrayType will always require to specify a DataType, please take a look by yourself here. The most generic DataType I can think of is StringType. That is because any other type can be converted into string (int, float, boolean all of them have a string representation). Therefore indeed it make sense to convert an empty array to array[string] by default.
I think so but then, why not leave the type as the general class ArrayType(DataType) since all spark types inherit from that once...
Because you can't DataType is abstract you must give a specific type which inherits DataType

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.