0

I am trying to build an Edge RDD for GraphX. I am reading a csv file and converting to DataFrame Then trying to convert to an Edge RDD:

val staticDataFrame = spark.
  read.
  option("header", true).
  option("inferSchema", true).
  csv("/projects/pdw/aiw_test/aiw/haris/Customers_DDSW-withDN$.csv")

val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]]  = 
  staticDataFrame.select(
    "dealer_customer_number",
    "parent_dealer_cust_number",
    "dealer_code"
  ).map{ (row: Array) => 
    Edge((
      row.getAs[Long]("dealer_customer_number"), 
      row.getAs[Long]("parent_dealer_cust_number"),
      row("dealer_code")
    ))
  }

But I am getting this error:

<console>:81: error: class Array takes type parameters
       val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]]  = staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").map((row: Array) => Edge((row.getAs[Long]("dealer_customer_number"), row.getAs[Long]("parent_dealer_cust_number"), row("dealer_code"))))
                                                                                                                                                                      ^

The result for

staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").take(1)

is

res3: Array[org.apache.spark.sql.Row] = Array([0000101,null,B110])
4
  • could you please re-run the compilation on indented code? The monstrous line is completely unreadable also in the error message, but I'm not sure how to fix that... Use the gray edit button. Commented Mar 6, 2018 at 3:12
  • Okay ty. I will re-run the code. Commented Mar 6, 2018 at 3:15
  • Same error after running indented code:( Commented Mar 6, 2018 at 3:17
  • Of course it's the same error. But now the line number is actually meaningful, because it does not point to the gargantuan single-line query. Commented Mar 6, 2018 at 3:18

1 Answer 1

1

First, Array takes type parameters, so you would have to write Array[Something]. But this is probably not what you want anyway.

The dataframe is a Dataset[Row], not a Dataset[Array[_]], therefore you have to change

.map{ (row: Array) => 

to

.map{ (row: Row) =>

Or just omit the typing completely (it should be inferred):

.map{ row =>
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for the right answer! I am able to make the code work with a few changes; as a new Scala learner I would appreciate it if you could explain how adding { instead of ( after map changes the context of the map function. Thank you!
@HarisIrshad In this particular example, the kind of used parentheses shouldn't change anything at all, I've used {} instead of () because that's my personal preference for longer map-clauses with lambdas that declare parameters explicitly. I do this because it's easier to change such a declaration to a pattern matching { case (...) => }, which does require {} instead of (). So, it's essentially just a strategy to keep the code a tiny bit more modifiable. It's not essential for your problem. The type of row was what caused the error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.