Syntactic Pipelines

Lately I’ve been thinking about Clojure programs written in this “threaded” or “pipelined” style:

(defn large-process [input]
  (-> input
      subprocess-one
      subprocess-two
      subprocess-three))

If you saw my talk at Clojure/West (video forthcoming) this should look familiar. The value being “threaded” by the -> macro from one subprocess- function to the next is usually a map, and each subprocess can add, remove, or update keys in the map. A typical subprocess function might look something like this:

(defn subprocess-two [data]
  (let [{:keys [alpha beta]} data]
    (-> data
        (assoc :epsilon (compute-epsilon alpha))
        (update-in [:gamma] merge (compute-gamma beta)))))

Most subprocess functions, therefore, have a similar structure: they begin by destructuring the input map and end by performing updates to that same map.

This style of programming tends to produce slightly longer code than would be obtained by writing larger functions with let bindings for intermediate values, but it has some advantages. The structure is immediately apparent: someone reading the code can get a high-level overview of what the code does simply by looking at the outer-most function, which, due to the single-pass design of Clojure’s compiler, will always be at the bottom of a file. It’s also easy to insert new functions into the process: as long as they accept and return a map with the same structure, they will not interfere with the existing functions.

The only problem with this code from a readability standpoint is the visual clutter of repeatedly destructuring and updating the same map. (It’s possible to move the destructuring into the function argument vector, but it’s still messy.)

defpipe

What if we could clean up the syntax without changing the behavior? That’s exactly what macros are good for. Here’s a first attempt:

(defmacro defpipe [name argv & body]
  `(defn ~name [arg#]
     (let [{:keys ~argv} arg#]
       ~@body)))
(macroexpand-1 '(defpipe foo [a b c] ...))
;;=> (clojure.core/defn foo [arg_47_auto]
;;     (clojure.core/let [{:keys [a b c]} arg_47_auto] ...))

That doesn’t quite work: we’ve eliminated the :keys destructuring, but lost the original input map.

return

What if we make a second macro specifically for updating the input map?

(def ^:private pipe-arg (gensym "pipeline-argument"))

(defmacro defpipe [name argv & body]
  `(defn ~name [~pipe-arg]
     (let [{:keys ~argv} ~pipe-arg]
       ~@body)))

(defn- return-clause [spec]
  (let [[command sym & body] spec]
    (case command
      :update `(update-in [~(keyword (name sym))] ~@body)
      :set    `(assoc ~(keyword (name sym)) ~@body)
      :remove `(dissoc ~(keyword (name sym)) ~@body)
      body)))

(defmacro return [& specs]
  `(-> ~pipe-arg
       ~@(map return-clause specs)))

This requires some more explanation. The return macro works in tandem with defpipe, and provides a mini-language for threading the input map through a series of transformations. So it can be used like this:

(defpipe foo [a b]
  (return (:update a + 10)
          (:remove b)
          (:set c a)))

;; which expands to:
(defn foo [input]
  (let [{:keys [a b]} input]
    (-> input
        (update-in [:a] + 10)
        (dissoc :b)
        (assoc :c a))))

As a fallback, we can put any old expression inside the return, and it will be just as if we had used it in the -> macro. The rest of the code inside defpipe, before return, is a normal function body. The return can appear anywhere inside defpipe, as long as it is in tail position.

The symbol used for the input argument has to be the same in both defpipe and return, so we define it once and use it again. This is safe because that symbol is not exposed anywhere else, and the gensym ensures that it is unique.

defpipeline

Now that we have the defpipe macro, it’s trivial to add another macro for defining the composition of functions created with defpipe:

(defmacro defpipeline [name & body]
  `(defn ~name [arg#]
     (-> arg# ~@body)))

This macro does so little that I debated whether or not to include it. The only thing it eliminates is the argument name. But I like the way it expresses intent: a pipeline is purely the composition of defpipe functions.

Further Possibilities

One flaw in the “pipeline” style is that it cannot express conditional logic in the middle of a pipeline. Some might say this is a feature: the whole point of the pipeline is that it defines a single thread of execution. But I’m toying with the idea of adding syntax for predicate dispatch within a pipeline, something like this:

(defpipeline name
  pipe1
  ;; Map signifies a conditional branch:
  {predicate-a pipe-a
   predicate-b pipe-b
   :else       pipe-c}
  ;; Regular pipeline execution follows:
  pipe2
  pipe3)

The Whole Shebang

The complete implementation follows. I’ve added doc strings, metadata, and some helper functions to parse the arguments to defpipe and defpipeline in the same style as defn.

(def ^:private pipe-arg (gensym "pipeline-argument"))

(defn- req
  "Required argument"
  [pred spec message]
  (assert (pred (first spec))
          (str message " : " (pr-str (first spec))))
  [(first spec) (rest spec)])

(defn- opt
  "Optional argument"
  [pred spec]
  (if (pred (first spec))
    [(list (first spec)) (rest spec)]
    [nil spec]))

(defmacro defpipeline [name & spec]
  (let [[docstring spec] (opt string? spec)
        [attr-map spec] (opt map? spec)]
    `(defn ~name 
       ~@docstring
       ~@attr-map
       [arg#]
       (-> arg# ~@spec))))

(defmacro defpipe
  "Defines a function which takes one argument, a map. The params are
  symbols, which will be bound to values from the map as by :keys
  destructuring. In any tail position of the body, use the 'return'
  macro to update and return the input map."
  [name & spec]
  {:arglists '([name doc-string? attr-map? [params*] & body])}
  (let [[docstring spec] (opt string? spec)
        [attr-map spec] (opt map? spec)
        [argv spec] (req vector? spec "Should be a vector")]
    (assert (every? symbol? argv)
            (str "Should be a vector of symbols : "
                 (pr-str argv)))
    `(defn ~name
       ~@docstring
       ~@attr-map
       [~pipe-arg]
       (let [{:keys ~argv} ~pipe-arg]
         ~@spec))))

(defn- return-clause [spec]
  (let [[command sym & body] spec]
    (case command
      :update `(update-in [~(keyword (name sym))] ~@body)
      :set    `(assoc ~(keyword (name sym)) ~@body)
      :remove `(dissoc ~(keyword (name sym)) ~@body)
      body)))

(defmacro return
  "Within the body of the defpipe macro, returns the input argument of
  the defpipe function. Must be in tail position. The input argument,
  a map, is threaded through exprs as by the -> macro.

  Expressions within the 'return' macro may take one of the following
  forms:

      (:set key value)      ; like (assoc :key value)
      (:remove key)         ; like (dissoc :key)
      (:update key f args*) ; like (update-in [:key] f args*)

  Optionally, any other expression may be used: the input map will be
  inserted as its first argument."
  [& exprs]
  `(-> ~pipe-arg
       ~@(map return-clause exprs)))

And a Made-Up Example

(defpipe setup []
  (return  ; imagine these come from a database
   (:set alpha 4)
   (:set beta 3)))

(defpipe compute-step1 [alpha beta]
  (return (:set delta (+ alpha beta))))

(defpipe compute-step2 [delta]
  (return
   (assoc-in [:x :y] 42)  ; ordinary function expression
   (:update delta * 2)
   (:set gamma (+ delta 100))))  ; uses old value of delta

(defpipe respond [alpha beta gamma delta]
  (println " Alpha is" alpha "\n"
           "Beta is" beta "\n"
           "Delta is" delta "\n"
           "Gamma is" gamma)
  (return)) ; not strictly necessary, but a good idea

(defpipeline compute
  compute-step1
  compute-step2)

(defpipeline process-request
  setup
  compute
  respond)
(process-request {})

;; Alpha is 4 
;; Beta is 3 
;; Delta is 14 
;; Gamma is 107

;;=> {:gamma 107, :delta 14, :beta 3, :alpha 4}

14 thoughts on “Syntactic Pipelines”

  1. Great stuff Stuart. This actually reminds me a lot of the old webMethods EAI toolset which featured similar kinds of operations on pipelines that were effectively hashmaps. And it cost half a million pounds…..

    It strikes me that what you are doing is actually very similar to a monadic style of programming (in the sense of binding and return values). If you formalised these pipelines into monads, it might open up some further possibilities (e.g. using monad transformers to add logging, retry capabilities etc.?)

  2. Hi, Stuart,

    Will you package these code as a library?
    It does great help to me. I am looking for way that I can use it in my application.

  3. Nifty. Slight preference for having the default be that `defpipe` wraps the pipeline in `return` for you. (I’d have to write `respond` without using `defpipe`, I suppose, but that’s OK, since it’s not really a pipe in the same sense that `compute-step1` and `compute-step2` are.)

    Also, a typo, I think: in `compute-step1`, shouldn’t the `set` be a `:set`?

  4. Mike: Yes, it is slightly monadic in character. I’ve never been convinced of the utility of monads outside of strictly pure functional languages like Haskell, but that may be simply because I’ve never understood them.

  5. Brian: re. having `return` automatically included in `defpipe`: that would make the macro implementation simpler, but would prevent you from putting anything else in the function body. I wanted `defpipe` functions to be able to do all the things real functions can do, including conditional branching and side effects. For example:

    (defpipe foo [a b]
      (log "This is foo")
      (if (< a 100)
        (return (:update b + 100))
        (return)))
  6. [edited for code formatting -S]

    Shouldn’t side-effecty stuff like logging be isolated from functions containing transformation logic? Are explicit return calls idiomatic in clojure? How does [1] really differs from [1′], and [2] from [2′]? I’m not sure which is shorter.

    [1]
    (defpipe compute-step2 [delta]
      (return
       (assoc-in [:x :y] 42)
       (:update delta * 2)
       (:set gamma (+ delta 100))))
    
    [1']
    (defn compute-step2 [{:keys [delta] :as m}]
      (-> m
          (assoc-in [:x :y] 42)
          (update-in [:delta] * 2)
          (assoc :gamma (+ delta 100))))
    
    [2]
    (defpipeline compute
      compute-step1
      compute-step2)
    
    [2']
    (def compute (comp compute-step2 compute-step1))
  7. Dmitri: It’s not an explicit return call (which Clojure does not support) I just chose the name return because that’s where you put the code that manipulates the return value of the function. As I said, the return macro always has to be in tail position anyway.

    There is no actual difference between [1] and [1′] or between [2] and [2′]. Which you prefer is a matter of taste. My purpose in writing the macros was simply to demonstrate that macros can be used to formalize a repeated syntactic pattern. I’m not sure I’d even use these macros myself, unless I felt like they really added to the clarity of the code.

    One other possibility, which I failed to mention in the article, is that abstracting this pattern into macros could allow alternate implementations. For example, perhaps the macros could be made clever enough to use only local variables and omit the map parameter altogether for better performance. I don’t actually know if this is possible in this case, but it’s interesting to think about.

  8. Very nice piece of work, look forward to see it on GitHub ;)

    Just one improvement imho: I’d personally prefer if users could provide their own map manipulation functions rather than just relying on predefined ones; what do you think?

  9. Sergio: You can already supply your own map-manipulation functions: that’s why the return macro has the original body as the “else” clause in the case expression: if the expression given to return isn’t one of the predefined keywords, it’s just included verbatim.

  10. I came across this post by accident. I’m coming from the Python world and in Django the http middleware behaves just like these pipes, which was something I was trying to implement in Clojure – however you pull it off much more cleverly. I’m just learning macros (from Practical Clojure, your book no less) but for your case where you want conditional pipelines evaluated I wrangled this:


    (defmacro branch [pred & body]
    `(fn [x#]
    (if (~pred x#) (-> x# ~@body) x#)))

    I’m still playing with it, and at first blush it seems to be working:


    (defpipeline compute
    step1
    ((branch some-check
    substep1 substep2 substep3))
    step2)

    There is that *ugly* double parenthesis though. It’s pretty simplistic, but hopefully a shortcut for another, future, reader.

    Thanks for the post and thank you very much for your work on Practical Clojure.

Comments are closed.