3

I am trying to use clojure core.async channels to throttle memory-intensive concurrent processes. Each process loads an image into memory and applies a watermark. If I try to process too many images concurrently, I get OOM errors.

The pattern below seems to work, but it feels a bit inelegant. My question is, is there a better way to do this with core.async? Or, should I just use the java concurrency stuff to do this instead (i.e. create a fixed sized thread pool, etc).

The basic concept in the code below is to use a global fixed size channel, tchan which is used to throttle what goes into in-chan, basically limiting the number of concurrent processes to the size of tchan.

In the code below, process-images is the entry point.

(def tbuff (buffer 20))

(def tchan
  "tchan is used to throttle the number of processes
  tbuff is a fixed size buffer"
  (chan tbuff))

(defn accum-results
  "Accumulates the images in results-chan"
  [n result-chan]
  (let [chans [result-chan (timeout timeout-ms)]]
    (loop [imgs-out  []
           remaining n]
      (if (zero? remaining)
        imgs-out
        (let [[img-result _] (alts!! chans)]
          (if (nil? img-result)
            (do
              (log/warn "Image processing timed out")
              (go (dotimes [_ remaining] (<! tchan)))
              imgs-out)
            (do
              (go (<! tchan))
              (recur (conj imgs-out img-result) (dec remaining)))))))))

(defn process-images
  "Concurrently watermarks a list of images
  Images is a sequence of maps representing image info
  Concurrently fetches each actual image and applies the watermark
  Returns a map of image info map -> image input stream"
  [images]
  (let [num-imgs (count images)
        in-chan  (chan num-imgs)
        out-chan (chan num-imgs)]
    ;; set up the image-map consumer
    ;; asynchronously process things found on in-chan
    (go
      (dotimes [_ num-imgs]
        ; block here on input images
        (let [img-in (<! in-chan)]
          (thread
            (let [img-out (watermark/watermarked-image-is img-in)]
              (>!! out-chan [img-in img-out]))))))
    ;; put images on in-chan
    (go
      (doseq [img images]
        (>! tchan :x)
        (>! in-chan img)))
    ;; accum results
    (let [results (accum-results num-imgs out-chan)]
      (log/info (format "Processed %s of %s images and tbuff is %s"
                        (count results) num-imgs (count tbuff)))
      (into {} results))))

1 Answer 1

2

I believe this is exactly what pipeline is for.

And here's an example:

user> (require '[clojure.core.async :refer [<! <!! chan go go-loop pipeline pipeline-blocking pipeline-async] :as async])

user> (let [output (chan)
            input (async/to-chan (range 10))]
        (go-loop [x (<! output)]
          (println x))
        (pipeline 4
                  output
                  (map #(do
                          (Thread/sleep (rand-int 200))
                          (println "starting" %)
                          (Thread/sleep 1000)
                          (println "finished" %)
                          (inc %)))
                  input))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f434b5a "clojure.core.async.impl.channels.ManyToManyChannel@3f434b5a"]
user> starting 0
starting 3
starting 1
starting 2
finished 0
1
finished 3
finished 1
finished 2
starting 4
starting 5
starting 6
finished 4
finished 5
finished 6
Sign up to request clarification or add additional context in comments.

1 Comment

In the example I edited in here, it's worth noting that the call to map is a transducer and not a lazy sequence, it just looks like one.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.