0

Using spark java I have created dataframe on comma delimiter source file. In sourcefile if last column contains blank value then its throwing arrayindexoutofbound error. Below is sample data and code. is there any way I can handle this error because there is lot of chance getting blank values in last column. In below sample data 4th row causing issue.

Sample Data:

1,viv,chn,34
2,man,gnt,56
3,anu,pun,22
4,raj,bang,*

Code:

        JavaRDD<String> dataQualityRDD = spark.sparkContext().textFile(inputFile, 1).toJavaRDD();
        String schemaString = schemaColumns;
        List<StructField> fields = new ArrayList<>();
        for (String fieldName : schemaString.split(" ")) {
          StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
          fields.add(field);
        }
        StructType schema = DataTypes.createStructType(fields);

        JavaRDD<Row> rowRDD = dataQualityRDD.map((Function<String, Row>) record -> { 
                   // String[] attributes = record.split(attributes[0], attributes[1].trim());
                   Object[] items = record.split(fileSplit);

                   // return RowFactory.create(attributes[0], attributes[1].trim()); 
                           return RowFactory.create(items);
                 }); 


        }
    }

1 Answer 1

1

I used spark 2.0 and was able to read the csv without any exception:

        SparkSession spark = SparkSession.builder().config("spark.master", "local").getOrCreate();
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    JavaRDD<Row> csvRows = spark.read().csv("resources/csvwithnulls.csv").toJavaRDD();

    StructType schema = DataTypes.createStructType(
            new StructField[] { new StructField("id", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("fname", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("lname", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("age", DataTypes.StringType, false, Metadata.empty()) });

    Dataset<Row> newCsvRows = spark.createDataFrame(csvRows, schema);
    newCsvRows.show();

Used exactly the rows you have and it worked fine: see the output:

enter image description here

enter image description here

Sign up to request clarification or add additional context in comments.

6 Comments

Based on the source file I will be passing sql query which dynamically generates by another job.sql query will help me to know column information with error records.
I am sorry, if I misunderstood your question. But, in either case, you can create a Dataset like above and then pass in your query which you already have in one of the params I think? Any further transformations/actions can be performed by registering the dataset as a temp table? So, in a nutshell you have to maintain source file to query+other params map or something?
Thanks!!on top of the temp table I should pass the query but some of the files will not have header information.in that scenario my job will get fail due to column name mismatch.Based on the DDL dynamically ill create sql query..if there is no header then how can I handle this?
val b=spark.read.csv("/user/dfgs/spark_test/").toDF("name", "address"," id", "date")..other than thius is there any way I can map columnnames
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.