-2

I've below datasets

Table1

Table1

Table2

enter image description here

Now I would like to get below dataset. I've tried with left outer join Table1.id == Table2.departmentid but, I am not getting the desired output.

enter image description here

Later, I need to use this table to get several counts and convert the data into an xml . I will be doing this convertion using map.

Any help would be appreciated.

1

2 Answers 2

1

Only joining is not enough to get the desired output. Probably You are missing something and last element of each nested array might be departmentid. Assuming the last element of nested array is departmentid, I've generated the output by the following way:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.collect_list

case class department(id: Integer, deptname: String)
case class employee(employeid:Integer, empname:String, departmentid:Integer)

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val department_df = Seq(department(1, "physics")
                            ,department(2, "computer") ).toDF()
val emplyoee_df = Seq(employee(1, "A", 1)
                      ,employee(2, "B", 1)
                      ,employee(3, "C", 2)
                      ,employee(4, "D", 2)).toDF()

val result = department_df.join(emplyoee_df, department_df("id") === emplyoee_df("departmentid"), "left").
      selectExpr("id", "deptname", "employeid", "empname").
      rdd.map {
        case Row(id:Integer, deptname:String, employeid:Integer, empname:String) => (id, deptname, Array(employeid.toString, empname, id.toString))
      }.toDF("id", "deptname", "arrayemp").
          groupBy("id", "deptname").
          agg(collect_list("arrayemp").as("emplist")).
        orderBy("id", "deptname")

The output looks like this:

result.show(false)
+---+--------+----------------------+
|id |deptname|emplist               |
+---+--------+----------------------+
|1  |physics |[[2, B, 1], [1, A, 1]]|
|2  |computer|[[4, D, 2], [3, C, 2]]|
+---+--------+----------------------+

Explanation: If i break down the last dataframe transformation into multiple steps, it'll probably make clear how the output is generated.

left outer join between department_df and employee_df

val df1 = department_df.join(emplyoee_df, department_df("id") === emplyoee_df("departmentid"), "left").
      selectExpr("id", "deptname", "employeid", "empname")
df1.show()
    +---+--------+---------+-------+
| id|deptname|employeid|empname|
+---+--------+---------+-------+
|  1| physics|        2|      B|
|  1| physics|        1|      A|
|  2|computer|        4|      D|
|  2|computer|        3|      C|
+---+--------+---------+-------+

creating array using some column's values from the df1 dataframe

val df2 = df1.rdd.map {
                case Row(id:Integer, deptname:String, employeid:Integer, empname:String) => (id, deptname, Array(employeid.toString, empname, id.toString))
              }.toDF("id", "deptname", "arrayemp")
df2.show()
            +---+--------+---------+
        | id|deptname| arrayemp|
        +---+--------+---------+
        |  1| physics|[2, B, 1]|
        |  1| physics|[1, A, 1]|
        |  2|computer|[4, D, 2]|
        |  2|computer|[3, C, 2]|
        +---+--------+---------+

create new list aggregating multiple arrays using df2 dataframe

val result = df2.groupBy("id", "deptname").
              agg(collect_list("arrayemp").as("emplist")).
              orderBy("id", "deptname")
result.show(false)
            +---+--------+----------------------+
        |id |deptname|emplist               |
        +---+--------+----------------------+
        |1  |physics |[[2, B, 1], [1, A, 1]]|
        |2  |computer|[[4, D, 2], [3, C, 2]]|
        +---+--------+----------------------+
Sign up to request clarification or add additional context in comments.

Comments

0
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val df = spark.sparkContext.parallelize(Seq(
   (1,"Physics"),
   (2,"Computer"),
   (3,"Maths")
 )).toDF("ID","Dept")

 val schema = List(
    StructField("EMPID", IntegerType, true),
    StructField("EMPNAME", StringType, true),
    StructField("DeptID", IntegerType, true)
  )

  val data = Seq(
    Row(1,"A",1),
    Row(2,"B",1),
    Row(3,"C",2),
    Row(4,"D",2) ,
    Row(5,"E",null)
  )

  val df_emp = spark.createDataFrame(
    spark.sparkContext.parallelize(data),
    StructType(schema)
  )

  val newdf =  df_emp.withColumn("CONC",array($"EMPID",$"EMPNAME",$"DeptID")).groupBy($"DeptID").agg(expr("collect_list(CONC) as emplist"))

  df.join(newdf,df.col("ID") === df_emp.col("DeptID")).select($"ID",$"Dept",$"emplist").show()

---+--------+--------------------+
| ID|    Dept|             listcol|
+---+--------+--------------------+
|  1| Physics|[[1, A, 1], [2, B...|
|  2|Computer|[[3, C, 2], [4, D...|

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.