0

I have a sequence of customers that needs to be processed in parallel. I tried to use a pmap for that. The result is painfully slow, much slower than a sequential implementation. The inner function process-customer has a transaction. Obviously, the pmap launches all the transactions at once and they end up retrying killing performance. What is thee best way to parallelize this?

(defn process-customers [customers]
  (doall 
    (pmap 
      (fn [sub-customers]
        (doseq [customer sub-customers]
          (process-customer customer)))
      (partition-all 10 customers))))

EDIT: The process-customer function involves the below steps. I write the steps for brevity. All the steps are inside a transaction to ensure another parallel transaction does not cause inconsistencies like negative stock.

(defn- process-customer [customer]
  "Process `customer`. Consists of three steps:
  1. Finding all stores in which the requested products are still available.
  2. Sorting the found stores to find the cheapest (for the sum of all products).
  3. Buying the products by updating the `stock`.
)

EDIT 2: The below version of process-customers has the same performance as the parallel process-customers above. The below is obviously sequential.

(defn process-customers [customers]
  "Process `customers` one by one. In this code, this happens sequentially."
  (doseq [customer customers]
    (process-customer customer)))
9
  • 1
    Without knowing anything about what processing entails, what the expected performance characteristics are, etc., I don't know how this could be expected to be answerable. Please try to provide a minimal reproducible example -- the shortest possible code someone else can run to model the problem and test their proposed solutions. Commented Apr 29, 2019 at 23:41
  • (A good solution might, f/e, consist of making the transaction's logic commutative; of course, we can't tell if that's feasible unless we can see it). Commented Apr 29, 2019 at 23:44
  • @CharlesDuffy, I have an edit. Notice, this is a transaction that looks through a series of stores and gets the cheapest of the stores. If a customer wants to buy from more than one product, he must buy all from the cheapest store. That might be impossible to formulate as commutative. The stock of that store is then decreased. Of course, we do not want negative stock. Commented Apr 30, 2019 at 0:01
  • BTW -- what specific operation makes this slow, and thus makes parallelization important? Is there an external (database or other I/O-operation) lookup? Commented Apr 30, 2019 at 0:10
  • There is no long-running process. That seems to be the question you are asking :). I am merely logging on the screen. I can also turn off the logging. I was thinking because I am processing many customers parallelization would result in a performance boost. The shared resource stock which has to be kept consistent seems to be a bottleneck. Commented Apr 30, 2019 at 0:47

1 Answer 1

1

I assume your transaction is locking on the inventory for the full life cycle of process-customer. This will be slow as all customers are racing for the same universe of stores. If you can split the process into two phases: 1) quoting and 2) fulfilling and applies transaction only on (2) then the performance should be much better. Or if you buy into agent programming, you will have transaction boundary automatically defined for you at the message level. Here is one sample you can consider:

(defn get-best-deal
  "Returns the best deal for a given order with given stores (agent)"
  [stores order]
  ;;
  ;; request for quotation from 1000 stores (in parallel)
  ;;
  (doseq [store stores]
    (send store get-quote order))
  ;;
  ;; wait for reply, up to 0.5s
  ;;
  (apply await-for 500 stores)
  ;;
  ;; sort and find the best store
  ;;
  (when-let [best-store (->> stores
                             (filter (fn [store] (get-in @store [:quotes order])))
                             (sort-by (fn [store] (->> (get-in @store [:quotes order])
                                                       vals
                                                       (reduce +))))
                             first)]
    {:best-store best-store
     :invoice-id (do
                   ;; execute the order
                   (send best-store fulfill order)
                   ;; wait for the transaction to complete
                   (await best-store)
                   ;; get an invoice id
                   (get-in @best-store [:invoices order]))}))

and to find best deals from 1,000 stores for 100 orders (Total 289 line items) from 100 products:

(->> orders
       (pmap (partial get-best-deal stores))
       (filter :invoice-id)
       count
       time)
;; => 57
;; "Elapsed time: 312.002328 msecs"

Sample business logic:

(defn get-quote
  "issue a quote by checking inventory"
  [store {:keys [order-items] :as order}]
  (if-let [quote (->> order-items
                   (reduce reduce-inventory
                           {:store store
                            :quote nil})
                   :quote)]
    ;; has inventory to generate a quote
    (assoc-in store [:quotes order] quote)
    ;; no inventory
    (update store :quotes dissoc order)))

(defn fulfill
  "fulfill an order if previuosly quoted"
  [store order]
  (if-let [quote (get-in store [:quotes order])]
    ;; check inventory again and generate invoice
    (let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
      (cond-> store
        invoice (->
                  ;; register invoice
                  (assoc-in [:invoices order] invoice)
                  ;; invalidate the quote
                  (update :quotes dissoc order)
                  ;; update inventory
                  (assoc :inventory inventory'))))
    ;; not quoted before
    store))


Sign up to request clarification or add additional context in comments.

3 Comments

The entire "stores" is now an agent. I have two questions. 1. If you then run several threads, will their requests to quote and fulfil not be executed one at a time? 2. If that is the case, is this not the same as sequential implementation?
From a store perspective, the unit of work is done once a quote is out. Then it can switch to serve another customer/order right away. Customers can take their time in making their own purchase decisions without blocking any store. This is just reflecting how multi-thread/task works in real life scenario.
Btw, agent programming is just one way to model the solution. You can achieve similar throughput if appropriate locking granularity is applied with your approach. Something like Row lock (one store) vs. Table lock (the universe of stores).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.