0

I have done one transformation using sqlcontext in spark but same query I want to write using Spark Data frame only . This query include join operation plus case statement of SQL. The sql query written as below:

refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
                           "CASE "
                           "WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
                           "ELSE a.Quantity_Sold "
                           "END AS Quantity_Sold, "
                           "CASE "
                           "WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
                           "ELSE a.Vendor_ID "
                           "END AS Vendor_ID, "
                           "a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
                           "from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )

now I want equvalent code in spark dataframe in both scala and python. I have tried some code but its
not working .my tried code is as follow:

joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')

joinDf.withColumn\
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
 pf.col(landingData('Quantity_Sold'))).show()

In the above code joining done perfectly but case condition not working. I am getting--> TypeError: 'DataFrame' object is not callable I am using spark 2.3.2 version and python 3.7 and similarly scala 2.11 in case of spark-scala Please anyone suggest me any equivalent code or guidence !

1
  • check your Python code because you are trying to call a function from a dataframe instance Commented Oct 18, 2020 at 10:38

2 Answers 2

3

Here's a scala solution : Assuming landingData and preHoldData are your dataframes


 val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
 val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")

 val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))


 joinDf
 .withColumn("Quantity_Sold",
    when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
 ). drop("Quantity_Sold_ld","Quantity_Sold_phd")

You can do the same way for Vendor_id

The problem with your code is, you cannot reference the other/old dataframe names in withColumn operation. It has to be from the dataframe that you are operating.

Sign up to request clarification or add additional context in comments.

4 Comments

this code working fine but the issue is both the dataframe has same column names and we have to renamed each of that column otherwise not able to select required column.we want just selected column with this case not all.
@AliBinmazi you can easily drop columns. Edited the answer to drop the unwanted columns.
now this code work fine when we add drop() . thanks @Sanket9394
@AliBinmazi cool. Do consider accepting it as the answer if you found it useful :)
0

Below code will work on scala & for python you might tune little bit.

val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")

landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))

2 Comments

Its working fine till withcolumn function but when we applying .select function giving ambiguity issue with col("Quantity_Sold").I have tried with renaming it but still not working.
Ok, check @Sanket9394 his solution.. he has explained clearly... :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.