1

EDIT: Sorry about previous question quality, I hope this one would be more clear: With Spark application I'm loading whole directory of following JSON files:

    {
        "type": "some_type",
        "payload": {
            "data1": {
                "id": "1"           
            },
            "data2": {
                "id": "1",

            },
            "data3": {
                "id": "1"
            },
            "dataset1": [{
                "data11": {
                    "id": "1",
                },
                "data12": {
                    "id": "1",
                }
            }],
            "masterdata": {
                "md1": [{
                    "id": "1"
                },
                {
                    "id": "2"
                },
                {
                    "id": "3"
                }],
                "md2": [{
                    "id": "1",
                },
                {
                    "id": "2",
                },
                {
                    "id": "3",
                }]
            }
        }
    }

into a DataFrame and save as temp table in order to use it later. In this Json, fields from "payload" node are always present, but subnodes in "masterdata" are optional. Next step is creating multiple DataFrames for each subnode of Json like this: DataFrame data1 contains data of node "data1" from all files and looks like a regular table with column "id". After first processing part my Spark state is as follow: DataFrames: data1(id), data2(id), data3(id), data11(id), data12(id), md1(id), md2(id)

Here comes the problem - if one of the JSON files in directory doesn't contain md2 node, I cannot run neither show() nor collect() on "md2" DataFrame due to NullPointException. I would understand if all files are missing "md2" node so It could not create md2 DataFrame, but in this case I expect md2 DataFrame simply not have data from json file that doesn't have node md2, but contains all others.

Technical details: To read data from nested node I'm using rdd.map & rdd.flatmap, then I'm, converting it to DataFrame with custom column names

If I run application when all files in directory contains all nodes everything works, but if a single file is missing md2 node App fails upon .show() or .collect()

BTW If node exists but its empty all works fine.

Is there any way to make Spark support optional Json nodes or handle missing nodes within rdd.map&flatmap?

I hope it's more clear than previous question

On @Beryllium request, here are rdd operations that I'm using to get md2 DataFrame

    val jsonData = hiveContext.sql("SELECT `payload`.masterdata.md2 FROM jsonData")
    val data = jsonData.rdd.flatMap(row => row.getSeq[Row](0)).map(row => (
    row.getString(row.fieldIndex("id"))
    )).distinct
    val dataDF = data.toDF("id")    
7
  • 1
    Possible duplicate of What is a Null Pointer Exception, and how do I fix it? Commented Nov 27, 2015 at 9:19
  • 1
    @PetterFriberg with all due respect, it's not cause it has NPE on it that it is a duplicate, it is not in this case. Commented Nov 27, 2015 at 9:23
  • @Silverrose you need to provide a MCVE so we can help! Commented Nov 27, 2015 at 9:23
  • @eliasah Sorry but as the current question is posted, I think this is the best answer one can give. Commented Nov 27, 2015 at 9:25
  • 1
    Still not sufficient; please add the rdd.map/flatMaps and the conversion to DF - cannot reproduce your problem using plain sqlContext.read.json. Commented Nov 27, 2015 at 10:10

1 Answer 1

3

Quick fix

Try to insert a filter() like this:

sqlContext.sql("SELECT payload.masterdata.md2 FROM jsonData")
  .rdd
  .filter(_.getSeq[Row](0) != null)
  .flatMap(row => row.getSeq[Row](0))
  .map(row => (row.getString(row.fieldIndex("id"))))
  .distinct
  .toDF("id")
  .show()

Using explode()

This removes the null values as soon as possible: So it should be faster (at least it's shorter):

sqlContext
  .sql("select t.a.id from (SELECT explode(payload.masterdata.md2) as a FROM jsonData) t")
  • explode() explodes away the null.
  • Then the sub query extracts only the ID

Even simpler: Extract ID first, then explode():

sqlContext.sql("SELECT explode(payload.masterdata.md2.id) FROM jsonData").show()
Sign up to request clarification or add additional context in comments.

4 Comments

@Silverrose This is just a quick hack; please tell me, if it returns the expected result - there are probably better ways.
@up Thanks, that did the job. I thought it would automaticaly ignore empty values since schema was "ready" for optional nodes. Thing that is still unclear for me is, why exception is thrown on .show() not on processing? It allows me to create DataFrame and us it later one, but throws exception once I try to display this DF or other based on this one.
The schema is fine - the map fails. And since an RDD is evaluated lazily, you see the error in show(), because it's there where the result is materialized eventually.
I'll keep explode solution in mind when I struggle performance issue, so far rdd.map is fine. Thx a lot!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.