Parallel Processing with core.async

Update August 13, 2041: This approach may now be obsolete with the introduction of pipeline in core.async.

Update April 27, 2015: the date 2041 in the previous update is a typo, but updates from the future ties in nicely with the async theme so I decided to leave it in.

✻ ✻ ✻

Say you have a bunch of items to process, and you want to parallelize the work across N threads. Using core.async, one obvious way to do this is to create N go blocks, all reading from the same input channel.

(defn parallel
  "Processes values from input channel in parallel on n 'go' blocks.

  Invokes f on values taken from input channel. Values returned from f
  are written on output channel.

  Returns a channel which will be closed when the input channel is
  closed and all operations have completed.

  Note: the order of outputs may not match the order of inputs."
  [n f input output]
  (let [tasks (doall
               (repeatedly n
                #(go-loop []
                   (let [in (<! input)]
                     (when-not (nil? in)
                       (let [out (f in)]
                         (when-not (nil? out)
                           (>! output out))
                         (recur)))))))]
    (go (doseq [task tasks]
          (<! task)))))

This might create more go blocks than you need, but inactive go blocks don’t cost much except a little memory.

But this isn’t always ideal: if f is going to block or do I/O, then you might want to create a thread instead of a go. Threads are more expensive. Suppose you don’t know how quickly the inputs will arrive: you might end up creating more threads than you need.

What I typically want is to process things in parallel with as many threads as necessary, but at most N. If the processing with two threads is fast enough to keep up with the input, then we should only create two threads. This applies to other kinds of resources besides threads, network calls for example.

After many attempts, here is what I came up with:

(defn pmax
  "Process messages from input in parallel with at most max concurrent
  operations.

  Invokes f on values taken from input channel. f must return a
  channel, whose first value (if not closed) will be put on the output
  channel.

  Returns a channel which will be closed when the input channel is
  closed and all operations have completed.

  Creates new operations lazily: if processing can keep up with input,
  the number of parallel operations may be less than max.

  Note: the order of outputs may not match the order of inputs."
  [max f input output]
  (go-loop [tasks #{input}]
    (when (seq tasks)
      (let [[value task] (alts! (vec tasks))]
        (if (= task input)
          (if (nil? value)
            (recur (disj tasks task))  ; input is closed
            (recur (conj (if (= max (count tasks))  ; max - 1 tasks running
                           (disj tasks input)  ; temporarily stop reading input
                           tasks)
                         (f value))))
          ;; one processing task finished: continue reading input
          (do (when-not (nil? value) (>! output value))
              (recur (-> tasks (disj task) (conj input)))))))))

The function f is responsible for both processing the input and creating the response channel. So f could be a go, a thread, or something else that returns a channel, such as an asynchronous I/O operation. There’s a little bit of extra overhead to shuffle around data structures in this go-loop, but I’m assuming that the cost of processing inputs will dominate.

So how to test it? First a few helpers.

We want to make sure that the output channel doesn’t hold up anything else, so we’ll make a helper function to consume everything from it:

(defn sink
  "Returns an atom containing a vector. Consumes values from channel
  ch and conj's them into the atom."
  [ch]
  (let [a (atom [])]
    (go-loop []
      (let [val (<! ch)]
        (when-not (nil? val)
          (swap! a conj val)
          (recur))))
    a))

What we want to keep track of is how many parallel operations are running at any given time. We can have our “processing” function increment a counter when it starts, wait a random interval of time, then decrement the counter before returning.

My colleague @timbaldridge suggested a watch function to keep track of how high the counter gets. This will produce a record of how many tasks were active at any time during the test.

(defn watch-counter [counter thread-counts]
  (add-watch counter
             :thread-count
             (fn [_ _ _ thread-count]
               (swap! thread-counts conj thread-count))))

Here’s the pmax function using a go block:

(deftest t-pmax-go
  (let [input (to-chan (range 50))
        output (chan)
        result (sink output)
        max-threads 5
        counter (atom 0)
        f (fn [x]
            (go
             (swap! counter inc)
             (<! (timeout (rand-int 100)))
             (swap! counter dec)
             x))
        thread-counts (atom [])]
    (watch-counter counter thread-counts)
    (<!! (pmax max-threads f input output))
    (is (= (set (range 50)) (set @result)))
    (is (every? #(<= % max-threads) @thread-counts))))

And pmax using a thread:

(deftest t-pmax-thread
  (let [input (to-chan (range 50))
        output (chan)
        result (sink output)
        max-threads 5
        counter (atom 0)
        f (fn [x]
            (thread
             (swap! counter inc)
             (<!! (timeout (rand-int 100)))
             (swap! counter dec)
             x))
        thread-counts (atom [])]
    (watch-counter counter thread-counts)
    (<!! (pmax max-threads f input output))
    (is (= (set (range 50)) (set @result)))
    (is (every? #(<= % max-threads) @thread-counts))))

But what we really wanted to know is that pmax won’t create more threads than necessary when the input source is slower than the processing. Here’s that test, with a deliberately slow input channel:

(deftest t-pmax-slow-input
  (let [input (chan)
        output (chan)
        result (sink output)
        max-threads 5
        actual-needed-threads 3
        counter (atom 0)
        f (fn [x]
            (go
             (swap! counter inc)
             (<! (timeout (rand-int 100)))
             (swap! counter dec)
             x))
        thread-counts (atom [])]
    (watch-counter counter thread-counts)
    ;; Slow input:
    (go-loop [i 0]
      (if (< i 50)
        (do (<! (timeout 50))
            (>! input i)
            (recur (inc i)))
        (close! input)))
    (<!! (pmax max-threads f input output))
    (is (= (set (range 50)) (set @result)))
    (is (every? #(<= % actual-needed-threads) @thread-counts))))

This still isn’t suitable for every scenario: maybe each thread needs some expensive set-up before it can process inputs. Then you would need some more elaborate mechanism to keep track of how many threads you have and whether or not they are keeping up with the input.

I have released the code in this blog post under the MIT License. View the source code on GitHub.

Update January 1, 2014: As my colleague @craigandera pointed out, this code doesn’t do any error handling. It’s easy enough to add once you make a decision about how to handle errors: ignore, log, or abort the whole process.