0

I need the updatedDF as per new columns. but it is not updated with new columns , it still gives me old columns and its names

val schema = "sku_cd#sku_code,ean_nbr#ean,vnr_cd#nan_key,dsupp_pcmdty_desc#pack_descr" 

val schemaArr = schema.split(",")

var df = spark.sql("""select sku_code, ean , nan_key, pack_descr from db.products""")

val updatedDF = populateAttributes(df,schemaArr)


 def populateAttributes(df:DataFrame,schemaArr:Array[String]) : DataFrame = {
 for(i <- schemaArr)
    {
          val targetCol = i.split("#")(0)
          val sourceCol = i.split("#")(1)
          df.withColumn(targetCol, col(sourceCol))
     }
      df
   }

I get below output which is incorrect

 scala> updatedDF.printSchema
 root
 |-- sku_code: string (nullable = true)
 |-- ean: string (nullable = true)
 |-- nan_key: string (nullable = true)
 |-- pack_descr: string (nullable = true)

Expected output

 |-- sku_cd: string (nullable = true)
 |-- ean_nbr: string (nullable = true)
 |-- vnr_cd: string (nullable = true)
 |-- dsupp_pcmdty_desc: string (nullable = true)

2 Answers 2

1

You are not updating the dataframe in your for loop. The line:

df.withColumn(targetCol, col(sourceCol))

will create a new dataframe and df will remain the same.

You can use var in order to reassign the original dataframe in each iteration. Also use withColumnRenamed to rename a column:

df = df.withColumnRenamed(sourceCol, targetCol)

Or better, use foldLeft :

def populateAttributes(df:DataFrame,schemaArr:Array[String]) : DataFrame = {

 schemaArr.foldLeft(df)((acc, m) => {
     val mapping = m.split("#")
     acc.withColumnRenamed(mapping(1), mapping(0))
 })
}

Another way using a select expression :

val selectExpr = schemaArr.map(m => {
  val mapping = m.split("#")
  col(mapping(1)).as(mapping(0))
})

val updatedDF = df.select(selectExpr:_*)
Sign up to request clarification or add additional context in comments.

Comments

0

Just another way to do what blackbishop did

val schema = "sku_cd#sku_code,ean_nbr#ean,vnr_cd#nan_key,dsupp_pcmdty_desc#pack_descr" 

val schemaArr = schema.split(",").toSeq

val outputDF=schemaArr.foldLeft(inputDF)((df,x)=>df.withColumnRenamed(x,x.split('#')(0)))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.