Using Transducers with Core.async in ClojureScript

I was working on a pretty big post last year when I was sidelined with a new job and started writing a book, Learning ClojureScript. Working on the book has given me a lot of opportunities to explore areas of the ClojureScript ecosystem that I wouldn't otherwise have explored, and some of the learnings that have resulted have been pretty cool.

Specifically, two of the things I've been learning about lately are core.async and transducers. Core.async is a highly popular Clojure(Script) library for embedding CSP-style concurrent program design in libraries and applications; transducers are composable functions useful for performing transformations on data.

There are a number of articles about core.async already floating around on the internet, but transducers appear to remain largely misunderstood. Speaking personally, I've have had quite a bit of trouble wrapping my mind around them, and one of the things that has helped me tighten up my understanding has been to get a better understanding of their specific use cases.

In general, I think the fundamental insight for understanding transducers is to realize that their application is not so different from other higher-order functions, but that they're applied to slightly different (though no less valuable) contexts. Thus far, the most awesome examples I've seen have been ones in which transducers were used in combination with core.async to build data transformations into asynchronous application message queues, so that's what I'll focus on here.

Let's get into it.

Error Handling Patterns

One of the advantages of core.async is that it encourages programs with a clear separation of concerns. Data is produced in one part of your application, and consumed and processed in another.

This separation of concerns has a drawback, however. If we have code that expects to be able to read from a channel, what happens when errors occur on the input side of the channel? We could catch and handle errors on the input side, but there are a number of circumstances in which that might be undesirable. For instance, what if there are many possible places in the codebase in which we put value into this channel? It is not unreasonable to imagine that we might want a central pipeline for handling errors.

There are a few different ways of going about this. One possibility would be to create a separate channel for error messages and to have an independent listener for that channel (a pattern discussed on the ClojureScript mailing list here).

Let’s assume we have an application where we're regularly polling the kitten factory to make sure production is running smoothly. If things are running smoothly, we'll create a report, and if they go badly, we'll log that to our exception tracking service.

Side note: What even happens at the kitten factory? Is it a factory that makes kittens, or do kittens run the factory? If they’re running the factory, what are they making – balls of yarn? I have trouble imagining this is a very efficient factory.

Such code might look like the following:

(ns app.factory.kitten
  "Logic for handling messages from the kitten factory."
  (:require [ajax.core :refer [GET]]
            [app.exceptions :as exc]
            [ :as report]
            [cljs.core.async :as async])
    [cljs.core.async.macros :as async-macros]))

;; define our channels
(def channel (async/chan 5))
(def error-channel (async/chan 5))

;; simple helper
(defn enqueue-val
  "Enqueue a new value into our channel."
  [c v]
  (async-macros/go (async/>! c v)))

(defn poll-kitten-factory
  "Poll the kitten factory endpoint for its current status."
  (GET "/kitten-factory"
       {:handler (fn [r] (enqueue-val channel r))
        :error-handler (fn [r] (enqueue-val error-channel r))}))

(defn listen
  "Listen for the latest message from the kitten factory channels."
    (while true
      (let [[v ch] (async/alts! [channel error-channel])]
        (case ch
          channel (report/send-success-report-to-cat-hq v)
          error-channel (exc/report-exception v))))))

An alternative pattern that we could embrace would be one in which we passed the errors themselves into the channel and checked to see if a message was an error when consuming. This is a pattern that David Nolen has written about previously on his blog here.

If you read his post, he refers to a <? macro, the source code for which is available on his blog's GitHub repo. The <? macro just checks to see if the value that’s been pulled off of the channel is an error, and if it is, throws. Without this, we’d just be passing an error around by value and the compiler would have no reason to actually throw the error (thereby preventing us from catching it, as well).

We'll copy the relevant code from his blog's GitHub repo into our app, modulo a few changes:

(ns app.helpers)

(defn throw-err
  (if (instance? js/Error x)
    (throw x)
(ns app.macros)

(defmacro <?
  "Actively throw an exception if something goes wrong when waiting on a channel message."
  `(app.helpers/throw-err (cljs.core.async/<! ~expr)))

Now that we have our helpers in place, let's change a few parts of our namespace.

;; we only need the one channel now, not two.
(def channel (async/chan 5))

;; we've changed the error-handler to cast the response
;; to a js/Error before putting it on the channel
(defn poll-kitten-factory
  "Poll the kitten factory endpoint for its current status."
  (GET "/kitten-factory"
       {:handler (fn [r] (enqueue-val channel r))
        :error-handler (fn [r] (enqueue-val channel (js/Error. r)))}))

(defn listen
  "Listen for the latest message from the kitten factory channel.
  If message is an error, throw and catch."
    (while true
        ;; note the use of `<?`
        (let [v (<? channel)]
          (report/send-success-report-to-cat-hq v))
        (catch js/Error e
          (exc/report-exception e))))))

This is a perfectly good solution, but do we really need that <? macro? This is a great chance to use a pure function (a transducer!) instead. Transducers can be passed as an optional argument to the chan constructor function, and they'll perform the given transformation on any values passed through the channel.

This means that we have an opportunity to write a transducer for our channel that just maps our helpful throw-err function over values being taken from the channel. Let's see what that looks like:

;; Initialize the channel with a transducer
(def channel (async/chan 5 (map h/throw-err)))

;; note: no change to poll-kitten factory from our 
;; last example; we're still casting errors to 
;; js/Error before enqueuing

(defn listen
  "Listen for the latest message from the kitten factory channel."
    (while true
        ;; back to normal `<!` macro
        (let [v (async/<! channel)]
          (report/send-success-report-to-cat-hq v))
        (catch js/Error e
          (exc/report-exception e)))))))

Wow - simple!

Let's take a look at another example - this one courtesy of my friend Allen (@arohner), with a few modifications.

Batching Data

Let's say we've got a similar setup to earlier, but instead we want to batch up at least 10 events before we send a report back to cat hq. If we were going to design this without transducers, we'd probably do something like the following:

;; two channels. `single` is being populated by
;; polling logic, `batch` is being populated here.
(def single (async/chan 100))

;;  taking from batch returns:
;; [kitten kitten kitten kitten...]
(def batch (async/chan 50))

(defn batch-kittens
  "Batch up kittens into vectors of length 10 and put
   them into the batch channel."
  (let [batch-size 10
        state (atom [])]
      (while true
        (let [v (async/<! single)]
          (swap! state conj v)
          (when-let [vs (first (partition batch-size @state))]
            (reset! state [])
            (async/>! batch vs)))))))

;; let's not worry about error handling for now
(defn listen
  "Listen for batch jobs and send them."
    (while true
      (let [v (async/<! batch)]
        (report/send-batch-report-to-cat-hq v)))))

This could probably be cleaned up some more, but at the very least it feels like we're doing more work than we should be. By contrast, if we were to use a transducer to batch these up, it would look like the following:

;; we no longer need two channels. `channel`
;; is populated upstream and we pull from it here
(def channel (async/chan 50 (partition-all 10)))

;; we don't need a batch-kittens function anymore either.

(defn listen
  "Listen for batch jobs and send them."
    (while true
       (let [v (async/<! channel)]
          (report/send-batch-report-to-cat-hq v))))))

This is unquestionably much cleaner than the version without transducers. As Allen put it: "You put one kitten in, and take ten out. You can't explain that."


The thing that stands out to me about transducers is that they empower us in a way that is simple. As a language feature, it's easy to imagine an implementation where performing data transformations on asynchronous streams could have been over-engineered and a nightmare to write code for. Instead, Clojure(Script)'s transducers are easy to read, write and reason about.

There are plenty of other applications for transducers that I haven't talked about in this post, but for me, the core.async tie-in is where their value was first made clear. If you think transducers are n.b.d. and can explain why I can't find that image of Rich Hickey pasted onto the guy from Ancient Aliens on Google, feel free to let me know.

~ V

Discuss this post on Hacker News or on Twitter (@venantius)

Thanks to Keith Ballinger (@keithba), Bill Cauchois (@wcauchois), Katherine Geenberg (@MyWolfyMaw), and Allen Rohner (@arohner) for not only reading drafts of this post, but for adding to it.

Finally, a shameless plug: if this is the sort of thing that interests you and you're interested in learning more, pre-order a copy of the book!