2

I'm working with Apache Spark's ALS model, and the recommendForAllUsers method returns a dataframe with the schema

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

In practice, the recommendations are a WrappedArray like:

WrappedArray([636958,0.32910484], [995322,0.31974298], [1102140,0.30444127], [1160820,0.27908015], [1208899,0.26943958])

I'm trying to extract just the item_ids and return them as a 1D array. So the above example would be [636958,995322,1102140,1160820,1208899]

This is what's giving me trouble. So far I have:

    val numberOfRecs = 20
    val userRecs = model.recommendForAllUsers(numberOfRecs).cache()

    val strippedScores = userRecs.rdd.map(row => {
      val user_id = row.getInt(0)
      val recs = row.getAs[Seq[Row]](1)

      val item_ids = new Array[Int](numberOfRecs)

      recs.toArray.foreach(x => {
        item_ids :+ x.get(0)
      })

      item_ids
    })

But this just returns [I@2f318251, and if I get the string value of it via mkString(","), it returns 0,0,0,0,0,0

Any thoughts on how I can extract the item_ids and return them as a separate, 1D array?

1
  • can you paste input df or the rdd Commented Nov 13, 2018 at 18:16

2 Answers 2

1

Found in the Spark ALSModel docs that recommendForAllUsers returns

"a DataFrame of (userCol: Int, recommendations), where recommendations are stored as an array of (itemCol: Int, rating: Float) Rows" (https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.ml.recommendation.ALSModel)

By array, it means WrappedArray, so instead of trying to to cast it to Seq[Row], I cast it to mutable.WrappedArray[Row]. I was then able to get each item_id like:

    val userRecItems = userRecs.rdd.map(row => {
      val user_id = row.getInt(0)
      val recs = row.getAs[mutable.WrappedArray[Row]](1)

      for (rec <- recs) {
        val item_id = rec.getInt(0)
        userRecommendatinos += game_id
      }
    })

where userRecommendations was a mutable ArrayBuffer

Sign up to request clarification or add additional context in comments.

Comments

0

You can use a fully qualified name to access a structure element in the array:

    scala> case class Recommendation(item_id: Int, rating: Float)
defined class Recommendation

scala> val userReqs = Seq(Array(Recommendation(636958,0.32910484f), Recommendation(995322,0.31974298f), Recommendation(1102140,0.30444127f), Recommendation(1160820,0.27908015f), Recommendation(1208899,0.26943958f))).toDF
userReqs: org.apache.spark.sql.DataFrame = [value: array<struct<item_id:int,rating:float>>]

scala> userReqs.printSchema
root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: integer (nullable = false)
 |    |    |-- rating: float (nullable = false)


scala> userReqs.select("value.item_id").show(false)
+-------------------------------------------+
|item_id                                    |
+-------------------------------------------+
|[636958, 995322, 1102140, 1160820, 1208899]|
+-------------------------------------------+

scala> val ids = userReqs.select("value.item_id").collect().flatMap(_.getAs[Seq[Int]](0))
ids: Array[Int] = Array(636958, 995322, 1102140, 1160820, 1208899)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.