1

I'm converting present Sql Querys to DataFrames using Spark-scala I had Query where I had Multiple Inner Joins to be performed.Actually I can Implement in SqlContext.sql("") but my team is not interested in sqlContext want to perform operations on top of data frames

si s inner join 
ac a on s.cid = a.cid and s.sid =a.sid
inner join De d on s.cid = d.cid AND d.aid = a.aid 
inner join SGrM sgm on s.cid = sgm.cid and s.sid =sgm.sid and sgm.status=1
inner join SiGo sg on sgm.cid =sg.cid and sgm.gid =sg.gid 
inner join bg bu on s.cid = bu.cid and s.sid =bu.sid
inner join ls al on a.AtLId = al.lid
inner join ls rl on a.RtLId = rl.lid
inner join ls vl on a.VLId = vl.lid

From My searching I got to know we can recursively join using

List(df1,df2,df3,dfN).reduce((a, b) => a.join(b, joinCondition))

But I cant satisfy above condition since there are multiple Conditions involved How can I perform this?

0

3 Answers 3

2

You can join multiple dataframes with multiple conditions like below:

val result = df1.as("df1").join(df2.as("df2"), 
              $"df1.col1"===$df2.col1" && $"df1.col2"===$df2.col2").join(df3.as("df3"), 
              $"df3.col1"===$df2.col1" && $"df3.col2"===$df2.col2", "left_outer")
Sign up to request clarification or add additional context in comments.

Comments

1

First of all, replace DataFrames with DataSet and Spark 2.+ to enable better performance by avoiding JVM objects - re project Tungsten.

Now, to your question: Lets say you have 4 x DS as:

First create schema for your tables:

case class DS (id: Int, colA: String)

Then read files with optimisation enabled:

 val ds1 = spark.read.parquet("X1").as[DS]

 val ds2 = spark.read.parquet("X2").as[DS]

 val ds3 = spark.read.parquet("X3").as[DS]

 val ds4 = spark.read.parquet("X4").as[DS]

Now, you can join them one by one so that you can follow the data flow (only use broadcast when you have small table):

case class JoinedDS (colB: String)


val joinedDS = ds1.join(broadcast(ds2), Seq("id"), "inner")
.join(ds3, Seq("id", "colB"), "inner")
.join(ds4, Seq("id"), "inner")
.select(col("colB") 
.as[JoinedDS]

2 Comments

Thanks I had a Question Which will be give the performance when compared to using sql("") and write sql queries or Datasets/DataFrames operations
as long as you ensure that you pass DataSet Objects at all times, you will notice better performance, pls monitor each objects when it is created if it has not been transformed to Dataframe, you might need to explicitly pass column names types ex select(col("as").as[String]), as long you do that spark will take care of optimization for you, when compared to sql you are definitely using untyped Dataset re dataFrame hence no optimizations
0

Below is an example of joining six tables/dataframes (not using SQL)

retail_db is a well known sample DB, anyone can get it from Google

Problem: //Get all customers from TX who bought fitness items

 val df_customers = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers").option("user", "root").option("password", "root").load()
  val df_products = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "root").option("password", "root").load() 
  val df_orders = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "orders"). option("user", "root").option("password", "root").load()
  val df_order_items = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_items").option("user", "root").option("password", "root").load()
  val df_categories = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "categories").option("user", "root").option("password", "root").load()
  val df_departments = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "departments").option("user", "root").option("password", "root").load()
  val df_order_items_all = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_all").option("user", "root").option("password", "root").load()



  val jeCustOrd=df_customers.col("customer_id")===df_orders.col("order_customer_id")
  val jeOrdItem=df_orders.col("order_id")===df_order_items.col("order_item_order_id")
  val jeProdOrdItem=df_products.col("product_id")===df_order_items.col("order_item_product_id")
  val jeProdCat=df_products.col("product_category_id")===df_categories.col("category_id")
  val jeCatDept=df_categories.col("category_department_id")===df_departments.col("department_id")
  // val jeOrdItem=df_orders.col("")===df_order_items.col("")



  //Get all customers from TX who bought fitness items
  df_customers.where("customer_state = 'TX'").join(df_orders,jeCustOrd).join(df_order_items,jeOrdItem).join(df_products,jeProdOrdItem).join(df_categories,jeProdCat).join(df_departments,jeCatDept).filter("department_name='Fitness'")
  .select("customer_id","customer_fname","customer_lname", "customer_street","customer_city","customer_state","customer_zipcode","order_id","category_name","department_name").show(5)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.