1

I'm working with the Spark MLlib PrefixSpan algorithm. I had some code working in Spark 1.6, but we recently moved to Spark 2.2.

I have a dataframe like this

viewsPurchasesGrouped: org.apache.spark.sql.DataFrame = [session_id: decimal(29,0), view_product_ids: array<bigint> ... 1 more field]

root
 |-- session_id: decimal(29,0) (nullable = true)
 |-- view_product_ids: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- purchase_product_ids: array (nullable = true)
 |    |-- element: long (containsNull = true)

and in Spark 1.6, I used this piece of code to convert it to the appropriate dataframe for MLlib consumption:

import scala.collection.mutable.WrappedArray

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
    Array(row.getAs[WrappedArray[String]](1).toArray), 
    Array(row.getAs[WrappedArray[String]](2).toArray)
  )
)

Since our switch, this doesn't work anymore.

I've tried this:

val viewsPurchasesRddString2 = viewsPurchasesGrouped.select("view_product_ids","purchase_product_ids").rdd.map( row =>
  Array(
    row.getSeq[Long](0).toArray, 
    row.getSeq[Long](1).toArray
  )
) 

and see this baffling error message, which means that it took session_id and purchase_product_ids instead of view_product_ids and purchase_product_ids out of the original dataframe.

Job aborted due to stage failure: [...] scala.MatchError: [14545234113341303814564569524,WrappedArray(123, 234, 456, 678, 789)]

I've also tried this:

val viewsPurchasesRddString = viewsPurchasesGrouped.map {
   case Row(session_id: Long, view_product_ids: Array[Long], purchase_product_ids: Array[Long]) => 
     (view_product_ids, purchase_product_ids)
}

which fails with

viewsPurchasesRddString: org.apache.spark.sql.Dataset[(Array[Long], Array[Long])] = [_1: array<bigint>, _2: array<bigint>]
prefixSpan: org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@10d69876
<console>:67: error: overloaded method value run with alternatives:
  [Item, Itemset <: Iterable[Item], Sequence <: Iterable[Itemset]](data: org.apache.spark.api.java.JavaRDD[Sequence])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] <and>
  [Item](data: org.apache.spark.rdd.RDD[Array[Array[Item]]])(implicit evidence$1: 
scala.reflect.ClassTag[Item])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] cannot be applied to (org.apache.spark.sql.Dataset[(Array[Long], Array[Long])])
   val model = prefixSpan.run(viewsPurchasesRddString)
                          ^

How do I port my code correctly?

2
  • It's hard to answer without having the data schema and more context. Ideally, you could hand us a notebook with data and runnable code (appreciated but not expected) Commented Mar 23, 2018 at 17:29
  • @StevenBlack Thanks for your answer. I have added the schema of viewsPurchasesGrouped. There isn't much more context, except the origin of viewsPurchasesGrouped, which is a Hive table. If you can point me to a resource for how to make a notebook that is independent from the Hive table, but still contains data with the same schema, I will also gladly provide a working notebook! Commented Mar 26, 2018 at 9:46

1 Answer 1

2

your dataframe suggests that the columns are of type array<string>, so you should not access these using Seq[Long]. In spark 1.6, map on a dataframe automatically switched to RDD API, in Spark 2 you need to use rdd.map instead to do the same thing. So I would suggest this should work:

val viewsPurchasesRddString = viewsPurchasesGrouped.rdd.map( row =>
  Array(
    Array(row.getAs[WrappedArray[String]](1).toArray), 
    Array(row.getAs[WrappedArray[String]](2).toArray)
  )
)
Sign up to request clarification or add additional context in comments.

2 Comments

I just realized that it did say Array[String], this was copied from a wrong place. I just fixed it!
Your version works! This is the adapted code: val viewsPurchasesRddString = viewsPurchasesGrouped.rdd.map( row => Array( Array(row.getAs[WrappedArray[Long]](1).toArray), Array(row.getAs[WrappedArray[Long]](2).toArray) ) ) It also works like this: val viewsPurchasesRddString = viewsPurchasesGrouped.rdd.map( row => Array( row.getAs[WrappedArray[Long]](1).toArray, row.getAs[WrappedArray[Long]](2).toArray ) )

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.