1

I used rdd.collect() to create an Array and now I want to use this Array[Strings] to create a DataFrame. My test file is in the following format(separated by a pipe |).

TimeStamp
IdC
Name
FileName
Start-0f-fields
column01  
column02 
column03 
column04 
column05 
column06 
column07 
column08 
column010 
column11
End-of-fields
Start-of-data 
G0002B|0|13|IS|LS|Xys|Xyz|12|23|48|  
G0002A|0|13|IS|LS|Xys|Xyz|12|23|45|  
G0002x|0|13|IS|LS|Xys|Xyz|12|23|48|  
G0002C|0|13|IS|LS|Xys|Xyz|12|23|48|
End-of-data
document  

the column name are in between Start-of-field and End-of-Field. I want to store "| " pipe separated in different columns of Dataframe.

like below example:

column01  column02 column03 column04 column05 column06 column07 column08 column010 column11
G0002C      0        13       IS       LS       Xys      Xyz     12        23         48
G0002x      0        13       LS       MS       Xys      Xyz     14        300        400

my code :

    val rdd = sc.textFile("the above text file")
    
    val columns = rdd.collect.slice(5,16).mkString(",") //  it will hold columnnames

    val data = rdd.collect.slice(5,16)
    val rdd1 = sc.parallelize(rdd.collect())
    val df = rdd1.toDf(columns)

but this is not giving me the above desired dataframe

2
  • You can transform the initial RDD to Dataframe. There is no need to collect the RDD Commented Nov 24, 2020 at 9:02
  • can you paste the whole code here so that it gives idea of what you are trying to do Commented Nov 24, 2020 at 9:11

2 Answers 2

1

Could you try this?

import spark.implicits._ // Add to use `toDS()` and `toDF()`

val rdd = sc.textFile("the above text file")
    
val columns = rdd.collect.slice(5,16) // `.mkString(",")` is not needed

val dataDS = rdd.collect.slice(5,16)
  .map(_.trim())                           // to remove whitespaces
  .map(s => s.substring(0, s.length - 1))  // to remove last pipe '|'
  .toSeq
  .toDS

val df = spark.read
  .option("header", false)
  .option("delimiter", "|")
  .csv(dataDS)
  .toDF(columns: _*)

df.show(false)
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
|column01|column02|column03|column04|column05|column06|column07|column08|column010|column11|
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
|G0002B  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
|G0002A  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |45      |
|G0002x  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
|G0002C  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+

Calling spark.read...csv() method without schema, can take a long time with huge data, because of schema inferences(e,g. Additional reading).

On that case, you can specify schema like below.

/*
  column01 STRING,
  column02 STRING,
  column03 STRING,

  ...
*/
val schema = columns
  .map(c => s"$c STRING")
  .mkString(",\n")


val df = spark.read
  .option("header", false)
  .option("delimiter", "|")
  .schema(schema)  // schema inferences not occurred
  .csv(dataDS)
// .toDF(columns: _*) => unnecessary when schema is specified
Sign up to request clarification or add additional context in comments.

Comments

0

If the number of columns and the name of the column are fixed then you can do that as below :

val columns = rdd.collect.slice(5,15).mkString(",") //  it will hold columnnames
val data = rdd.collect.slice(17,21)
val d = data.mkString("\n").split('\n').toSeq.toDF()
import org.apache.spark.sql.functions._
val dd = d.withColumn("columnX",split($"value","\\|")).withColumn("column1",$"columnx".getItem(0)).withColumn("column2",$"columnx".getItem(1)).withColumn("column3",$"columnx".getItem(2)).withColumn("column4",$"columnx".getItem(3)).withColumn("column5",$"columnx".getItem(4)).withColumn("column6",$"columnx".getItem(5)).withColumn("column8",$"columnx".getItem(7)).withColumn("column10",$"columnx".getItem(8)).withColumn("column11",$"columnx".getItem(9)).drop("columnX","value")
display(dd)

you can see the output as below:enter image description here

2 Comments

thanks @Nikunj, this was helpful. I tried the below code and it worked: ~~~ val data_frame = rdd.map(f => f.split("//|")).map( f=> (f(0),f(1),f(2),f(3),f(4),f(5),f(6),f(7),f(8),f(9),f(10),f(11))).toDF~~~ I was not able to use the val column for giving the column names so I had to hardcode it.
happy to hear. Can you upvote the answer if that helped you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.