5

I've been trying to convert an RDD to a dataframe. For that, the types need to be defined and not Any. I'm using spark MLLib PrefixSpan, that's where freqSequence.sequence is from. I start with a dataframe that contains Session_IDs, views and purchases as String-Arrays:

viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
  [session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]

I then calculate frequent patterns and need them in a dataframe so I can write them to a Hive table.

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())

case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)

val viewsPurchasesDf = freqSequencesRdd.map( fs =>
  {   
  val views = fs.sequence(0)(0)
  val purchases = fs.sequence(1)(0)
  val freq = fs.freq
  FreqSequences(views, purchases, freq)
  }
)
viewsPurchasesDf.toDF() // optional

When I try to run this, views and purchases are "Any" instead of "Array[String]". I've desperately tried to convert them around, but the best I get is Array[Any]. I think I need to map the contents to a String, I've tried e.g. this: How to get an element in WrappedArray: result of Dataset.select("x").collect()? and this: How to cast a WrappedArray[WrappedArray[Float]] to Array[Array[Float]] in spark (scala) and thousands of other Stackoverflow questions...

I really don't know how to solve this. I guess I'm already converting the initial dataframe/RDD to much, but can't understand where.

2 Answers 2

3

I think the problem is that you have a DataFrame, which retains no static type information. When you take an item out of a Row, you have to tell it explicitly which type you expect to get.

Untested, but inferred from the information you gave:

import scala.collection.mutable.WrappedArray

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
    Array(row.getAs[WrappedArray[String]](1).toArray), 
    Array(row.getAs[WrappedArray[String]](2).toArray)
  )
)
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for your answer! When trying this, I get this error message: viewsPurchasesRddString: org.apache.spark.rdd.RDD[Array[Array[Array[String]]]] = MapPartitionsRDD[1801] at map at <console>:197 prefixSpan: org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@13b756c2 org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 1272.0 failed 4 times, most recent failure: Lost task 28.3 in stage 1272.0 (...): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String;
Unfortunately I get the same error even with the import.
3

I solved the problem. For reference, this works:

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
  row.getSeq[Long](1).toArray, 
  row.getSeq[Long](2).toArray
  )
)

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

case class FreqSequences(views: Long, purchases: Long, frequence: Long)

val ps_frequences = model.freqSequences.filter(fs => fs.sequence.length > 1).map( fs =>
    {   
    val views = fs.sequence(0)(0)
    val purchases = fs.sequence(1)(0)
    val freq = fs.freq
    FreqSequences(views, purchases, freq)
    }
)

ps_frequences.toDF()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.