1

i have input File (CSV) containing upto 20 columns. I have to filter input file based on number of columns. If row contains 20 columns then row is considered as good data else bad data.

Input File:

123456,"ID_SYS",12,"Status_code","feedback","HIGH","D",""," ",""," ",""," 
",9999," ",2013-05-02,9999-12-31,"N",1,2

I am reading file as RDD and splitting based on , and checking if row contains 20 columns

val rdd = SparkConfig.spark.sparkContext.textFile(CommonUtils.loadConf.getString("conf.inputFile"))
val splitRDD = rdd.map(line =>Row.fromSeq(line.split(",")))
val goodRDD = splitRDD.filter(arr => arr.size == 20)

I have to convert goodRDD into Dataframe?Dataset to apply some transformations I tried with below code

val rowRdd = splitRDD.map{
                 case Array(c1,c2,c3 .... c20) => Row(c1.toInt,c2....)
                 case _ => badCount++ 
                 }
val ds = SparkConfig.spark.sqlContext.createDataFrame(rowRdd 
        ,inputFileSchema)

I have 20 columns , I hav to write down 20 columns in pattern matching? I would like to know best way for rite solution

2
  • Plain CSV reader with mode set to DROPMALFORMED usually makes more sense (as per stackoverflow.com/questions/29704333/… and stackoverflow.com/questions/34347448/…) but otherwise your limited by the API (stackoverflow.com/questions/29383578/…). The only reasonable improvement here is to replace ` Row(c1.toInt,c2....)` with Row.fromSeq and simply add guarding expression: case xs if xs.size == 20 => Row.fromSeq(xs) Commented Nov 12, 2018 at 16:19
  • thanks for the response. I want the count of bad rows also (row conatining < 20 columns). If i use case xs if xs.size == 20 => Row.fromSeq(xs), i have to typecast some of values like c1.toLong Commented Nov 12, 2018 at 16:56

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.