3

I have a Spark Dataframe

Level    Hierarchy   Code
--------------------------
Level1  Hier1        1
Level1  Hier2        2
Level1  Hier3        3
Level1  Hier4        4
Level1  Hier5        5
Level2  Hier1        1
Level2  Hier2        2
Level2  Hier3        3  

I need to convert this to a Map variable like Map[String, Map[Int, String]]

i.e.

Map["Level1", Map[1->"Hier1", 2->"Hier2", 3->"Hier3", 4->"Hier4", 5->"Hier5"]]
Map["Level2", Map[1->"Hier1", 2->"Hier2", 3->"Hier3"]]

Please suggest a suitable approach to achieve this functionality.

My attempt. It works, but ugly

val level_code_df =master_df.select("Level","Hierarchy","Code").distinct()
val hierarchy_names = level_code_df.select("Level").distinct().collect()
val hierarchy_size = hierarchy_names.size
var hierarchyMap : scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]] =  scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]]()      
for(i <- 0 to hierarchy_size.toInt-1)    
println("names:"+hierarchy_names(i)(0))
val name = hierarchy_names(i)(0).toString()
val code_level_map = level_code_df.rdd.map{row => {
if(name.equals(row.getAs[String]("Level"))){
Map(row.getAs[String]("Code").toInt -> row.getAs[String]("Hierarchy"))
 } else 
 Map[Int, String]()
  }}.reduce(_++_)

  hierarchyMap = hierarchyMap + (name -> (collection.mutable.Map() ++ code_level_map))     
  }           

   }     
1
  • 1
    Hi, I added my code in the post. Commented Jan 5, 2017 at 14:47

2 Answers 2

5

You need to use dataframe.groupByKey("level") followeed by mapGroups. Don't forget also to include kryo map encoder:

case class Data(level: String, hierarhy: String, code: Int)
val data = Seq(
Data("Level1","Hier1",1),
Data("Level1","Hier2",2),
Data("Level1","Hier3",3),
Data("Level1","Hier4",4),
Data("Level1","Hier5",5),
Data("Level2","Hier1",1),
Data("Level2","Hier2",2),
Data("Level2","Hier3",3)).toDS
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Map[Int, String]]]

Spark 2.0+ :

data.groupByKey(_.level).mapGroups{ 
    case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap) 
}.collect() 
//Array[Map[String,Map[Int,String]]] = Array(Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4)), Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3)))

Spark 1.6+:

data.rdd.groupBy(_.level).map{
  case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap)
}.collect()
//Array[Map[String,Map[Int,String]]] = Array(Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3)), Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4)))
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your response. I hope your code corresponds to Spark 2.0+. Will this code work in Spark 1.6
Updated answer for spark-1.6. Almost the same, just need to convert to rdd
0

@prudenko's answer is probably the most concise - and should work with Spark 1.6 or later. But - if you're looking for a solution that stays with DataFrames API (and not Datasets), here's one using a simple UDF:

val mapCombiner = udf[Map[Int, String], mutable.WrappedArray[Map[Int, String]]] {_.reduce(_ ++ _)}

val result: Map[String, Map[Int, String]] = df
  .groupBy("Level")
  .agg(collect_list(map($"Code", $"Hierarchy")) as "Maps")
  .select($"Level", mapCombiner($"Maps") as "Combined")
  .rdd.map(r => (r.getAs[String]("Level"), r.getAs[Map[Int, String]]("Combined")))
  .collectAsMap()

NOTICE that this will perform badly (or OOM) if there might by thousands of different values for a single key (value of Level), but since you're collecting this all into driver memory anyway, this probably won't be an issue or your requirement won't work regardless.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.