4

I am facing a problem related to concurrency in spark which is stopping me from using it in production but I know there is a way out of it. I am trying to run Spark ALS on 7 million users for a billion products using order history. Firstly I am taking a list of distinct Users and then running a loop on these users to get recommendations, which is pretty slow process and will take days to get recommendations for all users. I tried doing cartesian users and products to get recommendations for all at once but again to feed this to elasticsearch I have to filter and sort records for each users and only then I can feed it to elasticsearch to be consumed by other APIs.

So please suggest me a solution which is pretty scalable in such use case and to be used in production with realtime recommendations.

Here is my code snippet in scala which will give you an idea how I am currently approaching to solve the problem:

  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)
3
  • On what level is your concurrency problem? Commented Aug 1, 2015 at 6:31
  • A way that Spark and Scala implements parallelization is with parellelized collections . From your code its not obvious to me that you are using data structures that can be parellelized but maybe there is a way to do it. For example what kind of data structure is buy_values. Since you are filtering and mapping its components it seems that it must be some kind of collection and possibly could be parallelized with parbuy_values = sc.parallelize(buy_values). See spark.apache.org/docs/latest/…. Commented Aug 1, 2015 at 6:49
  • buy_values is spark RDD but user_ids is a list. Iterating over user_ids is taking time. So this code is giving me 3 users/sec recommendations which is slow. If I can process multiple users parallel either using multi threading on same spark context or via other means then we can make it scale. Commented Aug 1, 2015 at 7:20

2 Answers 2

2

It is pretty clear that bottleneck in your program is a search for candidates. Given the Spark architecture it severely limits your ability to parallelize and adds substantial overhead by starting Spark job for each user.

Assuming typical scenario, with 7 million users and a billion products most of time you'll predict over a whole range of products minus few already bought by the user. At least in my opinion important question is why even bother with filtering. Even if you recommend product which has been previously bought is it really harmful?

Unless you have very strict requirements I would simply ignore the problem and use MatrixFactorizationModel.recommendProductsForUsers which pretty much does all the job, excluding data export, for you. After that you can perform bulk export and you're good to go.

Now lets say you have a clear no-duplicates policy. Working under assumption that a typical user purchased only a relatively small number of products you can start with obtaining a set of products for each user:

val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

Next you can simply map userProdSet to get predictions:

// Number of predictions for each user
val nPred = 30;

userProdSet.map{case (user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

You can improve further by using mutable sets for aggregation and by broadcasting the model but thats a general idea.

If number of user in user_ids is lower than number of user in a whole set (buy_values) you can simply filter userProdSet to keep only a subset of users.

Sign up to request clarification or add additional context in comments.

Comments

0

1.4 has recommendAll for generating all recommendations so that it can be served through kv stores.

2 Comments

Could you point to the documentation / code? There is a private method recommendForAll which used by recommendProductsForUsers I've already mentioned but I haven't seen recommendAll .
Yeah use recommendProductsForUsers whenever you rerun ALS (say hourly) and upload the recommendation onto a serving backend. I am assuming you are using a document datastore like solr / elasticsearch or hbase/cassandra to store the recommendations for serving...looks like you are using elasticsearch to serve the results...the topK results are already sorted and so you should not do any sorts whatsoever...You just need to keep a flag that the user clicked on the item and then the recommendation gets stale...you should filter the stale ones till next version of ALS is run

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.