1

Suppose we have the following csv file like

fname,age,class,dob

Second csv file name like

f_name,user_age,class,DataofBith

I am trying to make it common header for all csv file that return same Dataframe of header like in standard manner like

first_name,age,class,dob
val df2 = df.withColumnRenamed("DateOfBirth","DateOfBirth").withColumnRenamed("fname","name")
df2.printSchema()

But that way is not generic. Can we do this in a dynamic manner for all CSV as per standard conversion like of DataFrame of CSV in fname,f_name it should be converted to the name ?

8
  • are these csv files have same data structure but header with different formatted names and you want created dataframe from each file have same header? Commented Jan 22, 2020 at 9:06
  • Yes data structure is same but may be position of columns can be changed. Commented Jan 22, 2020 at 9:12
  • The csv may come in any format of header . I need to convert header as per the standard code so that my spark sql query did not need to change. Commented Jan 22, 2020 at 9:52
  • Hi @jackson did any of the next answers work for you? Commented Feb 7, 2020 at 10:33
  • there was a best answer which is deleted from that post i am searching that one Commented Feb 7, 2020 at 12:08

3 Answers 3

2

You can use List of schema then Iterate on top of schema like below -

Approach :1
val df= Seq((1,"goutam","kumar"),(2,"xyz","kumar")).toDF("id","fname","lname")
val schema=Seq("id"->"sid","fname"->"sfname","lname"->"slname")
val mapedSchema = schema.map(x=>df(x._1).as(x._2))
df.select(mapedSchema :_*)

while reading csv give "option("header", false)" then you can get read of mapping of old schema with new schema.

Approach :2
val schema=Seq("sid","sfname","slname")
val mapedSchema=data.columns.zip(schema)
val mapedSchemaWithDF = mapedSchema.map(x=>df(x._1).as(x._2))
df.select(mapedSchemaWithDF:_*)
Sign up to request clarification or add additional context in comments.

Comments

1

The function withColumnRenamed works also if the column is not present in the dataframe. Hence you can go ahead and read all dataframes and apply the same renaming logic everywhere and union them all later.

import org.apache.spark.sql.DataFrame

def renaming(df: DataFrame): DataFrame = {
   df.withColumnRenamed("dob", "DateOfBirth")
     .withColumnRenamed("fname", "name")
     .withColumnRenamed("f_name", "name")
     .withColumnRenamed("user_age", "age")
 // append more renaming functions here
}

val df1 = renaming(spark.read.csv("...your path here"))

val df2 = renaming(spark.read.csv("...another path here"))

val result = df1.unionAll(df2)

result will have the same schema (DateOfBirth, name, age) in this case.

Edit:

Following your input, if I understand correctly what you have to do, what about this?

val df1 = spark.read.csv("...your path here").toDF("name", "age", "class", "born_date")

val df2 = spark.read.csv("...another path here").toDF("name", "age", "class", "born_date")

5 Comments

This would not work. Because the column name is passed statically. It should be dynamic.
Also i did not need to merge csv. The csv may come in any format of header . I need to convert header as per the standard code so that my spark sql query did not need to change.
check my edited reply. I am not sure I understand what you need exacly though.
Can i pass dynamically header with replace header ?
can i pass "// append more renaming functions here " dynamically where first_value comes from the header of csv and second value come from list of static define conversion (standard conversion)?
1

You can use a simple select in combination with Scala Map. Is easier to handle the column transformations via a dictionary (Map) is which key will be the old name and value the new name.

Lets create first the two datasets as you described them:

val df1 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("fname", "age", "class", "dob")

val df2 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("f_name", "user_age", "class", "DataofBith")

Next we have created a Scala function named transform which accept two arguments, the target df and mapping which contains the transformations details:


val mapping = Map(
  "fname" -> "first_name",
  "f_name" -> "first_name",
  "user_age" -> "age",
  "DataofBith" -> "dob"
)

def transform(df: DataFrame, mapping: Map[String, String]) : DataFrame = {
  val keys = mapping.keySet
  val cols = df.columns.map{c => 
    if(keys.contains(c))
      df(c).as(mapping(c))
    else
      df(c)
  }

  df.select(cols:_*)
}

The function goes through the given columns checking first whether the current column exists in mapping. If so, it renames using the corresponding value from the dictionary otherwise the column remains untouched. Note that this will just rename the column (via alias) hence we don't expect to affect performance.

Finally, some examples:

val newDF1 = transform(df1, mapping)
newDF1.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+


val newDF2 = transform(df2, mapping)
newDF2.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.