0

I have data like:

[Michael, 100, Montreal,Toronto, Male,30, DB:80, Product:DeveloperLead]
[Will, 101, Montreal, Male,35, Perl:85, Product:Lead,Test:Lead]
[Steven, 102, New York, Female,27, Python:80, Test:Lead,COE:Architect]
[Lucy, 103, Vancouver, Female,57, Sales:89,HR:94, Sales:Lead]

So I have to read this data and define a case class using Spark. I have written the below program, but I get an error while converting the case class to a data frame. What's wrong in my code, and how can I correct it?

case class Ayush(name: String,employee_id:String ,work_place: Array[String],sex_age: Map [String,String],skills_score: Map[String,String],depart_title: Map[String,Array[String]])

I get an error (see the picture below) in the below line:

val d = df.map(w=> Ayush(w(0),w(1),w(2)._1,w(2)._2,w(3)._1,w(3)._2,w(4)._1,w(4)._2,w(5)._1,w(5)._2._1,w(5)._2._2))).toDF

Enter image description here

1
  • If you have a case class with arrays and Maps, then you need to use that as input as well. Commented Aug 15, 2018 at 2:45

2 Answers 2

3

I have changed your data. Wrap workplace and department data in double quotes so that I can get data with comma separated value. Then add a custom separator so that later I can use the separator to separate data. You can use your own separator. The image is below:

Enter image description here

The data is as follows:

Michael,100," Montreal,Toronto", Male,30, DB:80," Product,DeveloperLead" Will,101, Montreal, Male,35, Perl:85," Product,Lead,Test,Lead" Steven,102, New York, Female,27, Python:80," Test,Lead,COE,Architect" Lucy,103, Vancouver, Female,57, Sales:89_HR:94," Sales,Lead"

Below are the code changes I have performed which worked fine for me:

val df = spark.read.csv("CSV PATH HERE")
case class Ayush(name: String,employee_id:String ,work_place: Array[String],sex_age: Map [String,String],skills_score: Map[String,String],depart_title: Map[String,Array[String]])
val resultDF = df.map { x => {
       val departTitleData = x(6).toString
       val skill_score = x(5).toString
       val skill_Map = scala.collection.mutable.Map[String, String]()
       // Separate skill by underscore I can get each skill:Num then i will add each one in map
      skill_score.split("_").foreach { x => skill_Map += (x.split(":")(0) -> x.split(":")(1)) }
// Putting data into case class
     new Ayush(x(0).toString(), x(1).toString, x(2).toString.split(","), Map(x(3).toString -> x(4).toString), skill_Map.toMap, Map(x(6).toString.split(",")(0) -> x(6).toString.split(",")) )
}}
//End Here

The above code output is:

===============================================================================

+-------+-----------+--------------------+------------------+--------------------+--------------------+
|   name|employee_id|          work_place|           sex_age|        skills_score|        depart_title|
+-------+-----------+--------------------+------------------+--------------------+--------------------+
|Michael|        100|[ Montreal, Toronto]|  Map( Male -> 30)|      Map( DB -> 80)|Map( Product -> W...|
|   Will|        101|         [ Montreal]|  Map( Male -> 35)|    Map( Perl -> 85)|Map( Product -> W...|
| Steven|        102|         [ New York]|Map( Female -> 27)|  Map( Python -> 80)|Map( Test -> Wrap...|
|   Lucy|        103|        [ Vancouver]|Map( Female -> 57)|Map(HR -> 94,  Sa...|Map( Sales -> Wra...|
+-------+-----------+--------------------+------------------+--------------------+--------------------+
  • It may not be as what you expected, but it may help you achieve what you are trying to do...
Sign up to request clarification or add additional context in comments.

1 Comment

Hi, really thanks for your help, will try this and will check the result. It's helpful
0

@vishal I dont know if this question is still valid but here is my solution without changing the source data, fair warning it might be a little cringy :)

def main(args:Array[String]):Unit= {

    val conf=new SparkConf().setAppName("first_demo").setMaster("local[*]")
    val sc=new SparkContext(conf)
   val spark=SparkSession.builder().getOrCreate()
    import spark.implicits._
    val rdd1=sc.textFile("file:///C:/Users/k.sandeep.varma/Downloads/documents/documents/spark_data/employee_data.txt")
    val clean_rdd=rdd1.map(x=>x.replace("[","")).map(x=>x.replace("]",""))
   val schema_rdd=clean_rdd.map(x=>x.split(", ")).map(x=>schema(x(0),x(1),x(2).split(","),Map(x(3).split(",")(0)->x(3).split(",")(1)),Map(x(4).split(":")(0)->x(4).split(":")(1)),Map(x(5).split(":")(0)->x(5).split(":"))))
    val df1=schema_rdd.toDF()
    df1.printSchema()
  df1.show(false)

output:

|name   |employee_id|work_place         |sex_age       |skills_score    |depart_title                           |
+-------+-----------+-------------------+--------------+----------------+---------------------------------------+
|Michael|100        |[Montreal, Toronto]|[Male -> 30]  |[DB -> 80]      |[Product -> [Product, DeveloperLead]]  |
|Will   |101        |[Montreal]         |[Male -> 35]  |[Perl -> 85]    |[Product -> [Product, Lead,Test, Lead]]|
|Steven |102        |[New York]         |[Female -> 27]|[Python -> 80]  |[Test -> [Test, Lead,COE, Architect]]  |
|Lucy   |103        |[Vancouver]        |[Female -> 57]|[Sales -> 89,HR]|[Sales -> [Sales, Lead]]               |

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.