4

below is the logic to add sequence number column in dataframe. Its working as expected when I am reading data from delimited files. Today I have a new task to read the data from oracle table and add Sequence number and process further. I am facing issue with below logic to add sequence number in data frame when I read it from oracle table.

oracleTableDF is my dataframe

   //creating Sequence no. logic for SeqNum
   val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq)) 

  //creating StructType to add Seqnum in schema
        val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))

  //creating new Data Frame with seqnum
  oracleTableDF = spark.createDataFrame(rowRDD, newstructure)

I am not able to locate the actual Issue. because the logic is working as expected in cluster when I read it from files. but facing some issue when I read it from oracle table. its working as expected in local mode also.

below is the error :

"ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, xxxx, executor 1): java.lang.NoClassDefFoundError: Could not initialize class oracleDataProcess$"

4
  • Is this SeqNum column like a row number? Commented Sep 11, 2017 at 7:06
  • Yes, I am adding row number to each records. Commented Sep 11, 2017 at 7:46
  • Yes, I need to add row number to each records.its working good for delimited files but sure when I tried for oracle table facing issue. Commented Sep 11, 2017 at 7:52
  • You can use Window function row_number instead of monotonicallyIncreasingId but you would have to work with a single partition to generate continuous row_number. Commented Sep 11, 2017 at 8:26

2 Answers 2

6

If all you need is to add a column to your dataframe with an auto-increment integer value, you can use monotonicallyIncreasingId which is of LongType:

val oracleTableDF2 = oracleTableDF.withColumn("SeqNum", monotonicallyIncreasingId)

[UPDATE]

Note that monotonicallyIncreasingId is deprecated. monotonically_increasing_id() should be used instead.

Sign up to request clarification or add additional context in comments.

6 Comments

As per suggestion on "stackoverflow.com/questions/35705038/…" its not a clear function. and its does not take any parameters. I need to pass some parameters on this.
I believe issue SPARK-14393 has already been resolved since Spark 2.1. You're right that monotonicallyIncreasingId doesn't take parameter.
@LeoC, is there any way to get the values from 1?, now it is printing from 0.
@user4342532, monotonically_increasing_id() guarantees only generating Longs in a monotonically increasing fashion. It may start from 0 or some Long integer. If you want to have control in the id assignment, you should consider using zipWithIndex on a RDD, or row_number() Window function as illustrated in part of this blog post.
@user4342532, unfortunately monotonically_increasing_id() only guarantees the generated numbers are monotonically increasing. There is no guarantee they will be consecutive integers.
|
0

One option you can use is monotonically_increasing_id() to create a new column with an incremental id

val dataFrame = oracleTableDF.withColumn("incremental_id", monotonically_increasing_id())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.