I have a json-like structure in spark which looks as follows:
>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
depending on the partition, some-array might be an empty array for all id's. When this happend spark infers the following schema:
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string
Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem
>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
So far so good, but when I try to actually collect the data I get an error:
>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
I don't understand why this fails. There should be no incompatibility cause by the schema. In case you wonder, collecting the data without specifying the schema works.
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
Edit
Here there's a reproducible example in python
from pyspark.sql.types import *
myschema = StructType([
StructField('id', StringType())
, StructField( 'some-array'
, ArrayType(StructType([
StructField('array-field-1', StringType())
, StructField('array-field-2', StringType())
])
))
])
path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema
read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!