3

The question is reframed by giving more details.

I have a dataframe "dailyshow" Schema is:

scala> dailyshow.printSchema
root
 |-- year: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- showdate: string (nullable = true)
 |-- group: string (nullable = true)
 |-- guest: string (nullable = true)

Sample Data is:

scala> dailyshow.show(5)
+----+------------------+---------+------+----------------+
|year|        occupation| showdate| group|           guest|
+----+------------------+---------+------+----------------+
|1999|             actor|1/11/1999|Acting|  Michael J. Fox|
|1999|          Comedian|1/12/1999|Comedy| Sandra Bernhard|
|1999|television actress|1/13/1999|Acting|   Tracey Ullman|
|1999|      film actress|1/14/1999|Acting|Gillian Anderson|
|1999|             actor|1/18/1999|Acting|David Alan Grier|
+----+------------------+---------+------+----------------+

Below code is used to transform and generate results which return the top 5 occupations between the time period "01/11/1999" and "06/11/1999"

scala> dailyshow.
    withColumn("showdate",to_date(unix_timestamp(col("showdate"),"MM/dd/yyyy").
    cast("timestamp"))).
    where((col("showdate") >= "1999-01-11") and (col("showdate") <= "1999-06-11")).
    groupBy(col("occupation")).agg(count("*").alias("count")).
    orderBy(desc("count")).
    limit(5).show
        +------------------+-----+                                                      
        |        occupation|count|
        +------------------+-----+
        |             actor|   29|
        |           actress|   20|
        |          comedian|    4|
        |television actress|    3|
        | stand-up comedian|    2|
        +------------------+-----+

My question is how to code and get the same result when using RDD?

scala> dailyshow.first
res12: org.apache.spark.sql.Row = [1999,actor,1/11/1999,Acting,Michael J. Fox]

I used SimpleDateFormat to parse the string to date in a DataFrame.

Below is the code:

val format = new java.text.SimpleDateFormat("MM/dd/yyyy")

dailyshow.
  map(x => x.mkString(",")).
  map(x => x.split(",")).
  map(x => format.parse(x(2))).first // returns Mon Jan 11 00:00:00 PST 1999
6
  • What does dailyshow contain? And your first two maps look like they cancel each other. Commented Jul 13, 2017 at 11:13
  • @philantrovert dailyshow is a dataframe. Contents are [year: int, occupation: string, showdate: string, group: string, guest: string] The first 2 maps are for converting the DF to RDD as the DF begins with "[" and ends with "]". To avoid extracting these 2 characters, the first 2 maps are used. Commented Jul 13, 2017 at 11:25
  • Sample data in datashow and expected output will be of great help to answerers. Please add if you can, thanks. Commented Jul 13, 2017 at 11:27
  • I had reframed the question. Kindly help Commented Jul 13, 2017 at 12:59
  • 1
    So just so I understand. You want to repeat the process above in RDD instead of dataframe? May I ask why? Commented Jul 13, 2017 at 15:11

2 Answers 2

3

If I were you I would use spark's internal date functions as defined in org.apache.spark.sql.functions instead of manually doing it myself with simple date and mapping. This is because using dataframe functions is much simpler, much more idiomatic, less error prone and performs much better.

Lets assume you have a dataframe df which has column called dateString which contains a date string in the format MM/dd/yyyy.

Let's also assume you want to convert it to a date in order to extract the year and then display it in the format yyyy.MMMMM.dd

What you can do is:

val dfWithDate = df.withColumn("date", to_date($"dateString")
val dfWithYear = dfWithDate.withColumn("year", year($"date"))
val dfWithOutput = dfWithYear.withColumn("dateOutput", date_format("$date", "yyyy.MMMMM.dd")

Now the year column would contain the year and the dateOutput column would contain the string representation with your format.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the reply. But I am working on a RDD. These functions don't work on RDD
@Jayson You begin with dataframe and convert it to RDD. Why do need to do so directly in the beginning? If you provide a more thorough example of what you want (i.e. some input example of the original dataframe and how you plan to use the result) then it would be possible to improve
I had reframed the question. Kindly help
0

Got a lot of deprecation warning while writing this :D

So we have this data in a RDD

val rdd = sc.parallelize(Array(
     Array("1999","actor","1/11/1999","Acting","  Michael J. Fox"),
     Array("1999","Comedian","1/12/1999","Comedy"," Sandra Bernhard"),
     Array("1999","television actress","1/13/1999","Acting","Tracey Ullman"),
     Array("1999","film actress","1/14/1999","Acting","Gillian Anderson"),
     Array("1999","actor","1/18/1999","Acting","David Alan Grier")))

Then as per your question, we do a filter on date:

val filtered = rdd.filter{ x => 
    format.parse(x(2)).after( new java.util.Date("01/10/1999")) && 
    format.parse(x(2)).before(new java.util.Date("01/14/1999")) 
}

Then we get this :

Array[Array[String]] = Array(
Array(1999, actor, 1/11/1999, Acting, "  Michael J. Fox"), 
Array(1999, Comedian, 1/12/1999, Comedy, " Sandra Bernhard"), 
Array(1999, television actress, 1/13/1999, Acting, Tracey Ullman))

Then we group them with the second element as the key and count the number of occurrences :

filtered.keyBy(x => x(1) ).map((_, 1) ).reduceByKey(_+_).map{ case ((a, b) ,c) => (a,c) }

If everything goes right , you should get :

Array[(String, Int)] = Array((television actress,1), (Comedian,1), (actor,1))

2 Comments

Thanks it worked... However i modified the code for grouping. Used the following one. filtered.map(x => (x(1),1)).reduceByKey(_+_).take(5)
Oh great. That's better and less complex.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.