1

I'm experimenting with core.async on Clojure and ClojureScript, to try and understand how merge works. In particular, whether merge makes any values put on input channels available to take immediately on the merged channel.

I have the following code:

(ns async-merge-example.core
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
   [async-merge-example.exec :as exec]))

(defn async-fn-timeout
  [v]
  (async/go
    (async/<! (async/timeout (rand-int 5000)))
    v))

(defn async-fn-exec
  [v]
  (exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))

(defn merge-and-print-results
  [seq async-fn]
  (let [chans (async/merge (map async-fn seq))]
    (async/go
      (while (when-let [v (async/<! chans)]
               (prn v)
               v)))))

When I try async-fn-timeout with a large-ish seq:

(merge-and-print-results (range 20) async-fn-timeout)

For both Clojure and ClojureScript I get the result I expect, as in, results start getting printed pretty much immediately, with the expected delays.

However, when I try async-fn-exec with the same seq:

(merge-and-print-results (range 20) async-fn-exec)

For ClojureScript, I get the result I expect, as in results start getting printed pretty much immediately, with the expected delays. However for Clojure even though the sh processes are executed concurrently (subject to the size of the core.async thread pool), the results appear to be initially delayed, then mostly printed all at once! I can make this difference more obvious by increasing the size of the seq e.g. (range 40)

Since the results for async-fn-timeout are as expected on both Clojure and ClojureScript, the finger is pointed at the differences between the Clojure and ClojureScript implementation for exec..

But I don't know why this difference would cause this issue?

Notes:

  • These observations were made in WSL on Windows 10
  • The source code for async-merge-example.exec is below
  • In exec, the implementation differs for Clojure and ClojureScript due to differences between Clojure/Java and ClojureScript/NodeJS.
(ns async-merge-example.exec
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))

; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c

; clj implementation based on https://stackoverflow.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure

#?(:cljs (def spawn (.-spawn (js/require "child_process"))))

#?(:cljs
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
       (.on (.-stdout p) "data"  #(async/put! c [:out  (str %)]))
       (.on (.-stderr p) "data"  #(async/put! c [:err  (str %)]))
       (.on p            "close" #(async/put! c [:exit (str %)]))
       c)))

#?(:clj
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan)]
       (async/go
         (let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
               process (.start builder)]
           (with-open [reader (clojure.java.io/reader (.getInputStream process))
                       err-reader (clojure.java.io/reader (.getErrorStream process))]
             (loop []
               (let [line (.readLine ^java.io.BufferedReader reader)
                     err (.readLine ^java.io.BufferedReader err-reader)]
                 (if (or line err)
                   (do (when line (async/>! c [:out line]))
                       (when err (async/>! c [:err err]))
                       (recur))
                   (do
                     (.waitFor process)
                     (async/>! c [:exit (.exitValue process)]))))))))
       c)))

(defn exec
  "executes cmd with args. returns a channel immediately which
   will eventually receive a result map of 
   {:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
  [cmd & args]
  (let [c (exec-chan cmd args)]
    (async/go (loop [output (async/<! c) result {}]
                (if (= :exit (first output))
                  (assoc result :exit (second output))
                  (recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))

1 Answer 1

2

Your Clojure implementation uses blocking IO in a single thread. You are first reading from stdout and then stderr in a loop. Both do a blocking readLine so they will only return once they actually finished reading a line. So unless your process creates the same amount of output to stdout and stderr one stream will end up blocking the other one.

Once the process is finished the readLine will no longer block and just return nil once the buffer is empty. So the loop just finishes reading the buffered output and then finally completes explaining the "all at once" messages.

You'll probably want to start a second thread that deals reading from stderr.

node does not do blocking IO so everything happens async by default and one stream doesn't block the other.

Sign up to request clarification or add additional context in comments.

3 Comments

I don't think this is correct - whilst I am indeed using blocking IO, core.async executes code in go blocks using a pool of 8 threads. I can verify this when I execute the Clojure implementation, as I see 8 concurrent dash processes started immediately when I run the code. This confirms that the work is done in parallel. That said, what I can't account for, is the apparent delay in printing any results, even though sh processes start immediately...
You can run things in parallel just fine but each individual async/go will still block itself. Try adding a prn or so before the (if (or line err) line, you should see that no progress is made until both stdout/stderr receive a full line or the process exits.
Ok I see what you mean re: stdout/stderr, but in any case, I removed all the code relating to stderr, and I still get the same delay in printing. Further, I added (println "process finished for " cmd " with args " args) immediately after (.waitFor process), and I see that output immediately when running.. that would suggest the cause is something to do with the core.async channel used to communicate the :exit?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.