0

Imagine the following input:

val data = Seq (("1::Alice"), ("2::Bob"))
val dfInput = data.toDF("input")
val dfTwoColTypeString = dfInput.map(row => row.getString(0).split("::")).map{ case Array(id, name) => (id, name) }.toDF("id", "name")

Now I have a DataFrame with the columns as wished:

scala> dfTwoColTypeString.show
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+

Of course I would like to have the column id of type int, but it is of type String:

scala> dfTwoColTypeString.printSchema
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

Therefore I define this schema:

val mySchema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("name", StringType, true)
    ))

What is the best way to cast or convert the DataFrame dfTwoColTypeString to the given target schema.

Bonus: If the given input cannot be cast or converted to the target schema I would love to get a null row with an extra column "bad_record" containing the bad input data. That is, I want to accomplish the same, as the CSV parser in PERMISSIVE mode.

Any help really appreciated.

1
  • What I am looking for, is something, that is as smart as the CSV reader. I.e. instead of parsing a csv file or a Dataset[String], I have a Dataset[List[String]] and like the CSV parser I would love to have a function, that converts the List[String] into a List corresponding to the types given by the target schema WITHOUT manually casting every column. Commented Sep 5, 2018 at 17:33

4 Answers 4

1

If conversion required when data are read, such code can be used:

val resultDF = mySchema.fields.foldLeft(dfTwoColTypeString)((df, c) => df.withColumn(c.name, col(c.name).cast(c.dataType)))
resultDF.printSchema()

Output:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

For checking values match types, such code can be used:

  val dfTwoColTypeString = dfInput.map(
  row =>
    row.getString(0).split("::"))
  .map {
        case Array(id, name) =>
          if (ConvertUtils.canBeCasted((id, name), mySchema))
            (id, name, null)
          else (null, null, id + "::" + name)}
  .toDF("id", "name", "malformed")

Two new static functions can be created in custom class (here ConvertUtils):

def canBeCasted(values: Product, mySchema: StructType): Boolean = {
    mySchema.fields.zipWithIndex.forall(v => canBeCasted(values.productElement(v._2).asInstanceOf[String], v._1.dataType))
  }

import scala.util.control.Exception.allCatch

def canBeCasted(value: String, dtype: DataType): Boolean = dtype match {
    case StringType => true
    case IntegerType => (allCatch opt value.toInt).isDefined
    // TODO add other types here
    case _ => false
  }

Output with wrong "cc::Bob" value:

+----+-----+---------+
|id  |name |malformed|
+----+-----+---------+
|1   |Alice|null     |
|null|null |cc::Bob  |
+----+-----+---------+
Sign up to request clarification or add additional context in comments.

4 Comments

This solution is very nice and close to what I need. How can I handle the situation, when a string cannot be casted to the target schema type. E.g. if the id column contains "badid" which cannot be converted to an integer. It is no problem to expand the schema to contain a column "corrupt_record" and if a value cannot be casted the whole line should be put into the corrupt column. I.e. like the CSV parser in PERMISSIVE mode with a column for corrupt records.
"dfTwoColTypeString" can be filtered before conversion, guess, additional script for filter is required.
or during "dfInput.map" values can be checked according to schema, and if values incorrect, all input string can be put in additional column for wrong records. All regular columns can be set to null for such row.
Sounds great ... this would be the perfect solution ... since I am a spark beginner ... could you provide a code example how to do that?
0

If CSV reading required, and schema is known, can be assigned during reading:

spark.read.schema(mySchema).csv("filename.csv")

4 Comments

This is exactly the problem: It is not comma separated but double colon separated ... so I need to split the input myself and can no longer use the csv reader. So this answer does not help.
Separator for reading can be changed, more here: github.com/databricks/spark-csv
And when you have multiple different separators and regular expressions to parse a line into separated values? This is something, the csv reader cannot do. I need a CSV reader that can take in a Dataset[List[String]] ... i.e. the values are already separated in form of a list and now I just want to cast the values against the target schema as the csv reader does in the next step. That is the functionality I am looking for.
As far as I know, the csv reader accepts only exactly one character as separator, not a string like '::' ...
0
val cols = Array(col("id").cast(IntegerType),col("name"))
dfTwoColTypeString.select(cols:_*).printSchema

root |-- id: integer (nullable = true) |-- name: string (nullable = true)

//Another approach

import org.apache.spark.sql.types.{StringType,IntegerType,StructType,StructField}
val mySchema = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true)))
val df = spark.createDataFrame(dfTwoColTypeString.rdd,mySchema)
df.printSchema

root |-- id: integer (nullable = true) |-- name: string (nullable = true)

6 Comments

This is not what I mean. You must cast the id from string to integer as you have you do BY HAND. But what I want, that this cast gets generated by the given target schema. E.g. when you read a CSV all columns of course are first read as strings and then get automatically casted by the schema given for the csv file. I.e I MUST NOT write any code to cast the columns.
The second approach is wrong. When you create a DataFrame from an RDD, spark assumes, that the given schema fits for the given RDD, BUT DOES not cast or check, if all rows are valid regarding the schema. You can see, that your solution is wrong, when you do df.show(false). Only now all rows get processed and you will see an error message which says, that the column id is NOT of type integer as suggested by the schema
@Hiro.Protagonist I will cross check again the second approach. What about the first one. Does it fit your requirement
@Hiro.Protagonist this is what first approach is doing we are explicitly casting string to int
@Chandan.Ray The first approach is doing in manually. It does not use the target schema. I don't want to code the conversion by hand, as the wanted conversion is clearly given by the target schema. E.g. the csv converter only needs the target schema to do the conversion. I don't need to convert it by hand.
|
0

Considering dfTwoColTypeString to be a dataframe, you can also convert its schema type as below.

dfTwoColTypeString.withColumn("id", col("id").cast("Int"))

1 Comment

The task I am asking for is different. I don't want to cast explicitly by hand. I want a method, that is able to cast the columns by using the given target schema. This is, what the csv reader does, when given a schema.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.