4

I am trying to read big files in Spark Scala and trying to perform join on then . When I test with the small files it works very well but with bigger files I get some time below error .

I managed to pull out one of the files for which I was getting error . The file size is 1 GB and while creating the partition at last this error is thrown where I split file name to get the column .

Right after this line

 val rdd = sc.textFile(mainFileURL)
      val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
      val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
      println(schema)
      val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

This is the culprit line

 val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

Please suggest how can I handle this.

When I do rdd.count I get value . But when I do data.count() I get the error

Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 37
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, uniqueFundamentalSet), StringType), true) AS uniqueFundamentalSet#0
I

Here is my sample data set

uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
12
  • please post the exact line of code where the exception is thrown. Commented Mar 20, 2018 at 13:11
  • @RameshMaharjan it Is in the dataMain.join(latestForEachKey Commented Mar 20, 2018 at 13:25
  • you must have a mistake as there is no code to give you arrayindexoutofbound exception on that line Commented Mar 20, 2018 at 13:26
  • The full stacktrace of the ArrayIndexOutOfBoundsException would be helpful, if you have it. Commented Mar 20, 2018 at 13:57
  • 1
    Nope thats not the part where the error exists. Thats the part where you did an action of writing data to ouput path. In spark all the transformation happens when an action is performed. So if you do some action before that part it may seem that the error occured before that part. keep debugging. Commented Mar 20, 2018 at 18:36

1 Answer 1

5

filter out the rows which doesn't match

One of the easiest way is to filter out all the rows which doesn't match the length of schema before applying the schema to form a dataframe as

val requiredNumberOfFields = schema.fieldNames.length   //added to take the number of columns required
val data = sqlContext
  .createDataFrame(
    rdd
      .filter(!_.contains("uniqueFundamentalSet"))
      .map(line => line.split("\\|\\^\\|"))
      .filter(_.length == requiredNumberOfFields)    //added to filter in only the rows which has the same number of fields required in schema
      .map(x => Row.fromSeq(x.toSeq))
    , schema)

add dummy strings or filter out extra strings

You can write a function to check for the length. If the length of data is less than the schema then you can add dummy strings. If the length of the data is more you can drop the extra data

val requiredNumberOfFields = schema.fieldNames.length
def appendDummyData(row: Array[String], len: Int) = row.length == len match {
  case true => row
  case false => if(len > row.length) {
    val add = (for(loop <- 1 to len - row.length) yield "dummy").toArray
    row ++ add
  } else row.take(len)
}
val data = sqlContext
  .createDataFrame(
    rdd
      .filter(!_.contains("uniqueFundamentalSet"))
      .map(line => line.split("\\|\\^\\|"))
      .map(x => Row.fromSeq(appendDummyData(x, requiredNumberOfFields).toSeq))   //calling the custom function for checking the length
    , schema)

I hope the answer is helpful

Sign up to request clarification or add additional context in comments.

6 Comments

so whats the problem ..The issue is delimiter ? Can we get any how which lines are creating problem ...I was looking at you same kind of solution that you have posted..This solution is working
You can filter in the mismatching rows in a seprate variable and print them and see or you can store them for further analysis. I am going to give you another solution of adding dummy strings. so that when the dataframe is created you can analyze them
When I try to find the mismatch row with this ` .filter(_.length == requiredNumberOfFields)` then also I am getting error ...Not able to find exactly which line has mismatch
what error are you getting? and did you try the other method I just updated.
do val notmatching = rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).filter(_.length != requiredNumberOfFields) and print the notmatching to see the errored lines or save it in a file using saveAsTextFile then you analyze the lines for the cause of errors
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.