11

I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row) based on the user input. I'm not able to create a Row randomly.

Is there any functionality to create a Row from List or Array.

For eg., If I have a .csv file with the following format,

"91xxxxxxxxxx,21.31,15,0,0"

If the user input [1, 2] then I need to take only 2nd column and 3rd column along with the customer_id which is the first column

I try to parse it with the code:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `

where foo is defined as

def f(n: List[Int], s: String) : Row = {
    val n = input.length
    var out = new Array[Any](n+1)
    var r = s.split(",")
    out(0) = r(0)
    for (i <- 1 to n)
        out(i) = r(input(i-1)).toDouble
    Row(out)
}

and input is a List say

val input = List(1,2)

Executing this code I get l3 as:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])

But what I want is:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`

This has to be passed to create a schema in Spark SQL

3 Answers 3

18

Something like the following should work:

import org.apache.spark.sql._

def f(n: List[Int], s: String) : Row =
  Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
Sign up to request clarification or add additional context in comments.

1 Comment

This works fine, if I want to parse it as a single row of 3 string values. But how to use it, if the first value is a string, the 2nd and 3rd values are of Double? Is it possible?
2

You are missing creation of the StructField and StructType. Refer to the official guide http://spark.apache.org/docs/latest/sql-programming-guide.html, part Programmatically Specifying the Schema

I'm not a Scala specialist, but in Python it would look like this:

from pyspark.sql import *
sqlContext = SQLContext(sc)

input = [1,2]

def parse(line):
    global input
    l = line.split(',')
    res = [l[0]]
    for ind in input:
        res.append(l[ind])
    return res

csv  = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))

fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)

csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()

In short, you should not directly convert them to Row objects, just leave as RDD and apply schema to it with applySchema

3 Comments

Nice solution - please keep in mind that sqlContext.applySchema is deprecated in spark 2.x, so better use the dataframe-solution.
What if the data is nested? E.g. we have a StructType?
The question is about how create RDD of Row object, here you create a DataFrame
1

You can also try:

    Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))

1 Comment

Only for scala versions 2.12 up

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.