6

Is there any provision of doing "INSERT IF NOT EXISTS ELSE UPDATE" in Spark SQL.

I have Spark SQL table "ABC" that has some records. And then i have another batch of records that i want to Insert/update in this table based on whether they exist in this table or not.

is there a SQL command that i can use in SQL query to make this happen?

2
  • 1
    In regular Spark this would be done via a join followed by a map... Commented Aug 18, 2017 at 10:12
  • Thanks @GlennieHellesSindholt , can you please share an example Commented Aug 21, 2017 at 13:12

3 Answers 3

6

In regular Spark this could be achieved with a join followed by a map like this:

import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")

val df3 = df1
  .join(df2, 'df1_id === 'df2_id, "outer")
  .map(row => {
    if (row.isNullAt(2))
      (row.getString(0), row.getString(1))
    else
      (row.getString(2), row.getString(3))
  })

This yields:

scala> df3.show
+---+--------+
| _1|      _2|
+---+--------+
|id3|     new| 
|id1|     new|
|id2|original|
+---+--------+

You could also use select with udfs instead of map, but in this particular case with null-values, I personally prefer the map variant.

Sign up to request clarification or add additional context in comments.

1 Comment

This is fabulous, Is there any name for this technique?
1

you can use spark sql like this :

select * from (select c.*, row_number() over (partition by tac  order by tag desc) as 
    TAG_NUM from (
    select 
         a.tac
        ,a.name
        ,0 as tag
    from tableA a
    union all
    select 
        b.tac
        ,b.name
         ,1 as tag
    from tableB b) c ) d where TAG_NUM=1

tac is column you want to insert/update by.

Comments

0

I know it's a bit late to share my code, but to add or update my database, i did a fuction that looks like this :

import pandas as pd

#Returns a spark dataframe with added and updated datas
#key parameter is the primary key of the dataframes
#The two parameters dfToUpdate and dfToAddAndUpdate are spark dataframes
def AddOrUpdateDf(dfToUpdate,dfToAddAndUpdate,key):
    #Cast the spark dataframe dfToUpdate to pandas dataframe
    dfToUpdatePandas = dfToUpdate.toPandas()

    #Cast the spark dataframe dfToAddAndUpdate to pandas dataframe
    dfToAddAndUpdatePandas = dfToAddAndUpdate.toPandas()

    #Update the table records with the latest records, and adding new records if there are new records.
    AddOrUpdatePandasDf = pd.concat([dfToUpdatePandas,dfToAddAndUpdatePandas]).drop_duplicates([key], keep = 'last').sort_values(key)

    #Cast back to get a spark dataframe
    AddOrUpdateDf = spark.createDataFrame(AddOrUpdatePandasDf)

    return AddOrUpdateDf

As you can see, we need to cast the spark dataframes to pandas dataframe to be able to do the pd.concat and especially the drop_duplicates with the "keep = 'last'", then we cast back to spark dataframe and return it. I don't think this is the best way to handle the AddOrUpdate, but at least, it works.

2 Comments

Don't use Pandas, it will slow down your code ! It won't scale on several nodes !
How can i achieve that as spark dataframes ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.