2

I want to read in an ndjson file and apply a pre-defined schema to it (not allow spark to infer the schema). In general terms, this works fine. But we are unable to define certain elements as required.

Here's a really simplistic ndjson file:

{"name": "foo", "id": 1}
{"name": "bar"}

And here's some sample code:

babySchema = StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), True)
])

df = spark.read \
    .schema(babySchema) \
    .option("mode","permissive") \
    .option("columnNameOfCorruptRecord","_corrupt_record") \
    .json("/path/to/*.json")
df.show()

+----+----+
|  id|name|
+----+----+
|   1| foo|
|null| bar|
+----+----+

Even though the id field is set to not null (StructField("id", StringType(), False)), spark happily processes the record, and just sets the field to null.

How do I enforce that nullability, so that ideally that record would end up in the _corrupt_record column? I tried all 3 modes (permissive, failfast, dropmalformed), no difference.

1 Answer 1

1

As far as I can tell, it looks like spark does not consider missing fields (or mismatches in type) to be corrupted records, but it does enforce syntax errors in the json itself. Also, if you want a corrupted record to appear, you need to add _corrupt_record as a field in the schema.

I created the following ndjson for a little more coverage:

{"name": "foo", "id": "1"}
{"name": "bar"}
{"name": "foobar",}
{"name": "foobar","id": 2}

Record 3 is clearly corrupt and when reading in this file, spark will write this line to the _corrupt_record field, but Record 2 will populate id with NULL as you observed. Record 4 will have its numeric id cast as a string.

I am not sure if this will work for your use case, but we could create a rule that detects whether there is a NULL in a list of required fields, and then write an entry to the _corrupt_record field to indicate this. For example:

babySchema = StructType([
    StructField("name", StringType(), True),
    StructField("id", StringType(), False),
    StructField("_corrupt_record", StringType(), True),
])

required_fields = ["id", "name"]

any_null_condition = reduce(
    lambda acc, col_name: acc | F.col(col_name).isNull(),
    required_fields[1:],
    F.col(required_fields[0]).isNull()
)

df = spark.read \
    .schema(babySchema) \
    .option("mode","permissive") \
    .option("columnNameOfCorruptRecord","_corrupt_record") \
    .json("sample.ndjson") \
    .withColumn(
        '_corrupt_record', F.coalesce(F.col('_corrupt_record'), 
            F.when(
                any_null_condition, F.lit("Invalid field")
            ).otherwise(F.lit(None))
        )
    )

Result:

+------+----+-------------------+
|name  |id  |_corrupt_record    |
+------+----+-------------------+
|foo   |1   |NULL               |
|bar   |NULL|Invalid field      |
|NULL  |NULL|{"name": "foobar",}|
|foobar|2   |NULL               |
+------+----+-------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Yes, I can trap for it manually, but the goal was to catch it as part of parsing the json, so the entire record goes in the _corrupt_record field. But it seems Spark/Databricks is just ignoring the nullability. Which is a bummer.
Yeah, I was thinking you could try to convert the row into a dictionary to "reconstruct" the record so that instead of Invalid field, you would get {"name": "bar", "id": NULL} but I agree that's definitely not ideal. spark could implement some custom logic so that the user can determine what constitutes a corrupt record, but that would probably have to be a feature request

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.