2

I am new to Scala and Spark and I am trying to read a csv file locally (for testing):

val spark = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate;
val topics_df = spark.read.format("csv").option("header", "true").load("path-to-file.csv")
topics_df.show(10)

Here's how the file looks like:

+-----+--------------------+--------------------+
|topic|         termindices|         termweights|
+-----+--------------------+--------------------+
|   15|[21,31,51,108,101...|[0.0987100701,0.0...|
|   16|[42,25,121,132,55...|[0.0405490884,0.0...|
|    7|[1,23,38,7,63,0,1...|[0.1793091892,0.0...|
|    8|[13,40,35,104,153...|[0.0737646511,0.0...|
|    9|[2,10,93,9,158,18...|[0.1639456608,0.1...|
|    0|[28,39,71,46,123,...|[0.0867449145,0.0...|
|    1|[11,34,36,110,112...|[0.0729913664,0.0...|
|   17|[6,4,14,82,157,61...|[0.1583892199,0.1...|
|   18|[9,27,74,103,166,...|[0.0633899386,0.0...|
|   19|[15,81,289,218,34...|[0.1348582482,0.0...|
+-----+--------------------+--------------------+

with

ReadSchema: struct<topic:string,termindices:string,termweights:string>

The termindices column is supposed to be of type Array[Int], but when saved to CSV it is a String (This usually would not be a problem if I pull from databases).

How do I convert the type and eventually cast the DataFrame to a:

case class TopicDFRow(topic: Int, termIndices: Array[Int], termWeights: Array[Double])

I have the function ready to perform the conversion:

termIndices.substring(1, termIndices.length - 1).split(",").map(_.toInt)

I have looked at udf and a few other solutions but I am convinced that there should be a much cleaner and faster way to perform said conversion. Any help is greatly appreciated!

1
  • Read it in as text file and use map on the red then finally createDataFrame with your schema Commented Dec 6, 2018 at 5:30

1 Answer 1

4

UDFs should be avoided when it's possible to use the more efficient in-built Spark functions. To my knowledge there is no better way than the one proposed; remove the first and last characters of the string, split and convert.

Using the in-built functions, this can be done as follows:

df.withColumn("termindices", split($"termindices".substr(lit(2), length($"termindices")-2), ",").cast("array<int>"))
  .withColumn("termweights", split($"termweights".substr(lit(2), length($"termweights")-2), ",").cast("array<double>"))
  .as[TopicDFRow]

substr if 1-index based so to remove the first character we start from 2. The second argument is the length to take (not the end point) hence the -2.

The last command will cast the dataframe to a dataset of type TopicDFRow.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.