2

I have a primary SQL table that I am reading into Spark and modifying to write to CassandraDB. Currently I have a working implementation for converting a gender from 0, 1, 2, 3 (integers) to "Male", "Female", "Trans", etc (Strings). Though the below method does work, it seems very inefficient to make a seperate Array with those mappings into a DataFrame, join it into the main table/DataFrame, then remove, rename, etc.

I have seen:

.withColumn("gender", when(col("gender) === 1, "male").otherwise("female") 

that would allow me to continue method chaining on the primary table but have not been able to get it working with more than 2 options. Is there a way to do this? I have around 10 different columns on this table that each need their own custom conversion created. Since this code will be processing TBs of data, is there a less repetitive and more efficient way to accomplish this. Thanks for any help in advance!

case class Gender(tmpid: Int, tmpgender: String)

private def createGenderDf(spark:SparkSession): DataFrame = {
  import spark.implicits._
  Seq(
    Gender(1, "Male"),
    Gender(2, "Female"),
    Gender(777, "Prefer not to answer")
  ).toDF
}


private def createPersonsDf(spark: SparkSession): DataFrame = {
  val genderDf = createGenderDf(spark)
  genderDf.show()

  val personsDf: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load(dataPath + "people.csv")
    .withColumnRenamed("ID", "id")
    .withColumnRenamed("name_first", "firstname")

  val personsDf1: DataFrame = personsDf
    .join(genderDf, personsDf("gender") === genderDf("tmpid"), "leftouter")

  val personsDf2: DataFrame = personsDf1
    .drop("gender")
    .drop("tmpid")
    .withColumnRenamed("tmpgender", "gender")
}

1 Answer 1

2

You can use nested when function which would eliminate your need of creating genderDf, join, drop, rename etc. As for your example you can do the following

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
personsDf.withColumn("gender", when(col("gender") === 1, "male").otherwise(when(col("gender") ===2, "female").otherwise("Prefer not to answer")).cast(StringType))

You can add more when function in the above nested structure and you can repeate the same for other 10 columns as well.

Sign up to request clarification or add additional context in comments.

3 Comments

Is there a way to do this in the gender column that already exists (it is defined as Int type) with some kind of column type conversion or will I have to create a new column with type String and then conditionally set values into that column based on the gender column? I guess this second option would also require a column delete and then a column rename too...
withColumn will create a new column if gender column doesn't exist and it will replace the values in the column if gender column already exists. To change the datatype of a column in withColumn api, you can use cast. Please see my updated answer
Thank you for the help! Excellent answer that explained exactly what I needed

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.