3

I use some Java library that makes non-async get and post requests. I used to wrap such requests to futures and it solves for me the "waiting problem" (I mean waiting for the response)

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [res (atom [])]
    (dotimes [i n]
      (future (swap! res conj (unchangeable-lib-request i))))
    (loop []
      (if (> n (count @res))
        (recur)
        @res))))

(time (process 9))

;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]

But I need to create hundreds of requests and this creates performance problems. I found out about core.async and go blocks. But if I will use go-blocks with this library, it will not solve the "waiting problem"

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]

Go blocks can handle just 8 requests simultaneously. Is there a possibility to write some async-wrapper that will park go-block and provide ability to make 100s of requests asynchronously without blocking each other?

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (magic-async-parking-wrapper
                      (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 1003.2563 msecs"

I know about async/thread but it seems that this is the same as (future ...).

Is it possible?

2
  • Have you read this? Commented Oct 20, 2017 at 13:29
  • What are you pointing to? I didn't find anything useful there. You mean that using (Thread/sleep _) is not a good idea? So, that is the point. I already have a blocking operation and can't change it. And I need to do something with it Commented Oct 21, 2017 at 7:57

2 Answers 2

2

I'd suggest:

  • Use futures to create the threads, and have them put the results back onto a core async channel from outside of any go block using put!, something like: (future (put! chan (worker-function)))
  • Then use a go block to wait on that (single) channel, put in the results as you get them.
Sign up to request clarification or add additional context in comments.

5 Comments

Hm, why put! instead of >!!? I don't totally understand the difference between the two, but I think the point of put! is to allow you to do fancier error handling if the channel has no buffer space. Since you just want to wait for space, I would have expected >!!.
What is the difference between my first implementation and your answer? If I want to make 100 requests at the moment, the program will create 100 futures that will wait for their own requests till the response. This is not asynchronous. I don't understand something?
@amalloy the difference is if the channel c has no space, >!! will block a thread and wait for opportunity to put a value into the c, but the put! will not block the thread and will put a value asynchronously when there's an opportunity. But yes, it seems, in this case there's no difference
ˋput!ˋ Can be used from outside of a ˋgoˋ block, whereas the arrow functions can’t.
@tarmes The >! and <! arrow functions can't be used from outside of a go block. But the >!! and <!! can be, but will block threads
1

This is where you use clojure.core.async/pipeline-blocking

(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])


(let [output-chan (chan 100)
      input-chan (chan 1000)]
  (pipeline-blocking 4 ; parallelism knob
                     output-chan 
                     (map unchangeable-lib-request) 
                     input-chan)
  ;; Consume results from output-chan, put operations on input-chan
  [output-chan input-chan]
  )

This spawns n (in this case 4) threads that are kept busy executing unchangeable-lib-request.

Use the buffer size of output-chan to finetune how much requests you want to happen in advance.

Use the buffer size of input-chan to finetune how many requests you want scheduled without backpropagation (a blocking input-chan).

8 Comments

Yay! Nice solution, thanks! But is there performance difference between this method and using futures? As I understand there will be created N threads (= futures) that will handle blocking operations. It is impossible to wait for blocking requests using just 1-2 threads with parking?
@Defake Not sure I understand your question, but you can change 4 to 2 or 1 and then there will only be 2 threads or 1.
@Defake What exactly do you intend to do with parking?
You can, if you want, add a backchannel. Put into input-chan requests like this {:request-data "whatever-the-java-thing-needs", :response (promise-chan)}. Then make unchangeable-lib-request put the response on :response via >!!. Then do (go (let [resp (promise-chan)] (>! input-chan {:request-data ..., :response resp}) (<! resp)). This block will be parked until the result is calculated. Make sure to use a dropping-buffer for output-chan, though.
The example program runs 4 threads and no more or less. They are shutdown once you close input-chan. This is perfectly fine; JVM wise. Problematic is havingn one thread per requests because they will be blocking each other.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.