Executing System Processes and Illustrating FP in Clojure

Most programming languages have facilities of some sort to enable the developer to execute commands on the OS’s shell prompt. Maybe it’s just me, but I’ve never been able to have an easy time at managing system processes properly (i.e., representing processes individually through the language’s constructs). Representing multiple processes piped from one to the next properly seemed like a pipe dream (sorry). I think the first painless way of doing this, at least among all the different options that I’ve seen, is in Clojure with the clj-commons-exec project. At the moment, it seems like a hidden gem. A side benefit of the project is that the evolution of the code for piping one process to the next, at least for me, is instructive on the difference between object-oriented and functional paradigms.

To be more specific in what I mean about running commands and piped commands via language constructs, I’m not referring to Perl’s system, Java’s Runtime.exec(), Python’s os.system, or the first 3 of these 6 ways in Ruby. These methods take the full command, create a shell sub-/process, and execute the command as it is. Using these methods usually doesn’t end well. There were enough problems that Python deprecated os.system and direct use of the Popen module in favor of the Subprocess module, while Java unveiled ProcessBuilder in Java 1.5 to improve on the flaws of plain Runtime.exec. I’m not sure what the best way current is in Ruby, but the latter 3 of those 6 ways in Ruby seem to be styled after Python’s Popen. Urk.

On top of that, ProcessBuilder and Subprocess are representative examples of their respective languages. And in my experience, they are some mix of the tedious and the irregular, and the result is ultimately confusing. In Java, in addition to ProcessBuilder, there are other projects out there to make this a little less painful. An honorable mention goes to java-exec. It has a simple interface and a few, useful features that most other libraries do not attempt. It seems that Apache Commons Exec is our winner since it’s been around much longer, and Apache Ant has been using the code for a long time, so you know it’s been used heavily across all platforms. I’ve only seen Apache Commons Exec and java-exec with the ability to set a time limit on the executing process, and Commons Exec has a few more ways in which it is customizable.

Exec has been wrapped for use in Clojure by clj-commons-exec. Takahiro Hozumi has done a good job with it. The result of the execution is returned as a promise. There are a few nice conveniences, like the possibility of provided input as an existing stream or a string. If the user does not specify streams with which to capture output and error streams, those streams are automatically converted to strings. Given that, I wanted to see the ability to pipe one command’s output stream to the next in a similarly easy fashion. The following is a series of iterations that illustrate how the same task is achieved in ways that are increasingly in a more functional programming style, and the code is thus increasingly shorter and more elegant.

The general idea is to specify the input, output, and error streams for each process in a series of piped commands. Since the user can specify the input, output, and error streams for a single command, we can just use the same syntax — a beginning input stream and final output and error streams. We use the input stream for the first piped command, and the output and error streams for the last piped command. In between every pair of commands, we use a pair of PipedOutputStream and PipedInputStream objects that are connected to each other to create the pipe.

Iteration #1: loop

The first iteration comes from me. I was thinking about using map to create each process, but the problem is that each PipedOutputStream / PipedInputStream pair should be created at the same time to make it easier to ensure that they are properly connected. But the pair of streams belong to different commands. Reduce applies to each pair of consecutive commands, but the problem there is that it yields a single value from an input list. We need 2*(n-1) pipe objects from a list of n commands. Given the complex nature, I decided the only way to do this was to resort to old fashioned imperative programming state-ful looping using loop

(defn sh-pipe [& args-and-opts]
  (let [[cmds-list [opts]] (parse-args-pipe args-and-opts)
        in (when-let [i (:in opts)]
              (if (string? i) (string->input-stream i (:encode opts)) i))
        out (or (:out opts) (ByteArrayOutputStream.))
        err (or (:err opts) (ByteArrayOutputStream.))
        num-cmds (count cmds-list)
        nil-streams (into []
                          (for [_ (range num-cmds)]
                            [nil nil nil]))
        pipe-streams (loop [streams nil-streams
                            i 0]
                       (if (>= i (dec num-cmds))
                         streams
                         (let [pos (java.io.PipedOutputStream.)
                               pis (java.io.PipedInputStream. pos)
                               new-streams (-> streams
                                               (assoc-in [i 0] pos)
                                               (assoc-in [(inc i) 2] pis))]
                           (recur new-streams (inc i)))))
        all-streams (-> pipe-streams
                        (assoc-in [0 2] in)
                        (assoc-in [(dec num-cmds) 0] out)
                        (assoc-in [(dec num-cmds) 1] err))
        exec-fn (fn [cmd-and-args [cmd-out cmd-err cmd-in]]
                  (let [new-opts (-> opts
                                     (assoc :out cmd-out)
                                     (assoc :err cmd-err)
                                     (assoc :in cmd-in))
                        sh-fn-args (concat cmd-and-args [new-opts])]
                    (apply sh sh-fn-args)))]
    (last (doall
           (map exec-fn cmds-list all-streams)))))

My idea was to create a 2-D vector of the 3 streams (input, output, error) needed by each process. Once that is done, you can convert that 2-D vector into a 1-D vector, and from there, you can convert that into a series of calls to the sh function that we already have for executing one standalone command.

As you can see, that 2-D vector gets completed through success passes of adding/overwriting things in. It’s a lot like operating on a mutable data structure, and it’s a cumbersome one, at that. (I’ve seen Clojure people call operating a mutable data structure, “banging on data structures in place”.)

Iteration #2: reduce, for, map, map, ->, …

The second iteration was also by me in an attempt to make the code better and simpler. The idea that I came up with was to go ahead and create each pair of piped input/output streams. We have to put it in a vector of 3 to keep placeholders for the other 2 streams for each command. That’s 2*(n-1) vectors of 3-stream vectors, with each 3-stream vector containing 1 stream object and 2 nil’s. We need to allow for user-supplied input to the first command and user-supplied error and output to the last command, so we create the corresponding vectors for a total of 2*n vectors of 3. Each non-overlapping pair of 2 vectors contains the streams needed for the corresponding command. If there is an input stream, it’s usually in the first vector of the pair, and output and error streams end up in the second vector of the pair. All remaining values are nil. The easy way to get the non-nil value in a pair is or, and we can be sure of no conflicts (i.e., there are never 2 stream objects in an or call). It’s easier to think of everything in terms of lists, where we combine, partition, and operate on a few times until the data structures fit the shape that we want.

(defn sh-pipe [& args-and-opts]
  (let [[cmds-list [opts]] (parse-args-pipe args-and-opts)
        in (when-let [i (:in opts)]
              (if (string? i) (string->input-stream i (:encode opts)) i))
        out (or (:out opts) (ByteArrayOutputStream.))
        err (or (:err opts) (ByteArrayOutputStream.))
        num-cmds (count cmds-list)
        first-stream-set [nil nil in]
        middle-stream-sets (reduce concat
                                   (for [_ (range (dec num-cmds))]
                                     (let [pos (java.io.PipedOutputStream.)
                                           pis (java.io.PipedInputStream. pos)]
                                       [[pos nil nil] [nil nil pis]])))
        last-stream-set [out err nil]
        all-stream-sets (concat [first-stream-set] middle-stream-sets [last-stream-set])
        all-streams (map (fn [[set1 set2]] (map (fn [stream1 stream2] (or stream1 stream2)) set1 set2)) (partition 2 all-stream-sets))
        exec-fn (fn [cmd-and-args [cmd-out cmd-err cmd-in]]
                  (let [new-opts (-> opts
                                     (assoc :out cmd-out)
                                     (assoc :err cmd-err)
                                     (assoc :in cmd-in))
                        sh-fn-args (concat cmd-and-args [new-opts])]
                    (apply sh sh-fn-args)))]
    (last (doall
           (map exec-fn cmds-list all-streams)))))

Iteration #3: Concat, conj, cons

The final iteration comes from Takahiro, and it’s elegant. He creates (n-1) each of connected PipedOutputStream and PipedInputStream objects. In order to reflect the fact that these streams belong to different commands, he concats the user’s output stream to the end of the sequence of all output streams, and he conses the user’s input stream to the beginning to the sequence of all input streams. From there, it’s a simple map call to execute the commands.

(defn sh-pipe [& args-and-opts]
  (let [[cmds-list [opts]] (parse-args-pipe args-and-opts)
        first-in (when-let [in (:in opts)]
                   (if (string? in) (string->input-stream in (:encode opts)) in))
        num-cmds-1 (-> cmds-list count dec)
        pouts (repeatedly num-cmds-1 #(PipedOutputStream.))
        pins (map (fn [pos] (PipedInputStream. pos)) pouts)
        outs (concat pouts [(:out opts)])
        errs (concat (repeat num-cmds-1 nil) [(:err opts)])
        ins (cons first-in pins)
        opts-list (map (fn [in out err] (assoc opts :in in :out out :err err))
                       ins outs errs)]
    (doall
     (map sh cmds-list opts-list))))

Now, is iteration 3 more functional than iteration 2? If you go by the metric of the number of function calls or first-class functions, iteration 2 has more of them. If we look at the number of uses of clever functional idioms, such as using or for returning the first non-nil value, then iteration 2 still has more. But to interpret what I must have been thinking by “functional”, functional programming has simple functions that are used to perform a task simply. There’s no reason to use

(if (not (nil? a))
  a
  b)

when you have that or idiom available to you. It’s certainly possible to write imperative code in a functional language. Iteration 1 feels imperative-like in comparison to iterations 2 and 3. So there’s some difference. Perhaps the determining factor that I’m demarcating somewhat subjectively right now might begun to be gleaned from how unnecessarily unwieldy iteration 1 is.

Arguably, you could say that what makes iteration 3 simpler is just the idea of adding to the front of one sequence and to the back of another so that they’re “staggered”. And you could say that the idea is simple enough, and that it’s not too difficult to implement in most modern languages. You’re right. But if you do, then we’re veering towards Blub Paradox territory (for example, iteration 3 already has a couple of map calls using anonymous functions).

Iteration #0: Clojure’s built-in concurrency

One idea that might be worth exploring when people have time on their hands is implementing a more Clojure way for the system command execution. In particular, I mean to replace the Executor functionality within Commons Exec with Clojure’s concurrency mechanisms. If I understand correctly, Exectuor is an object that executes and monitors command. It launches the command in a new Thread so that Executor can have a way to terminate the command when the Watchdog time limit is reached. A promise is the right function to use in returning the command output. A ref might be accessible across multiple threads, but it will not wait for execution to finish. (You wouldn’t want to dereference an output stream that hasn’t fully finished being written to, for example.) A future allows you to run a command asynchronously in a background thread, and if you try to access the future from the main thread before it’s finished, it will block until it’s done. A promise allows you to send a result to an earlier point in code. You might do this because you may want to position code that uses the return value before the code to generate the value. But I see the main use case of a promise being the delivery of a return value back to one thread resulting from a potentially-blocking (e.g., I/O) operation initiated by another thread.

If we had to recreate the Executor functionality, you would need one explicitly-named Thread to monitor execute and terminate as per the Watchdog role. But execution of the command in a separate thread could be taken care of by a future, and the advantage is that Clojure has built in functions like future-done? and future-cancel to deal with execution state. In order to pass the result of the future back to the parent thread, you would need to employ a promise. So, in other words, a future nested inside a promise.

The following code was an early version of mine to get two commands with a shared pipe (before reusing the clj-commons-exec sh command was a consideration). The fact that the code does work is almost enough proof that a Thread delivering a promise containing a future would work. You might be able to arrive at a prototype of a Clojure-y version of Commons Exec by taking the code below and replacing the .execute commands with simple, synchronous system calls, perhaps of Runtime.exec variety.

(defn- run-pipe-2 [cmd-and-args1 cmd-and-args2]
  "works for two commands, one piped to the second. each command a vector of strings. return a future that contains a map similar to the one contained in the sh function's return promise"
  (let [result (promise)]
    ;; putting result promise in outer let in case
    ;; the inner let bindings can be GC'ed. this is premature optimization(?)
    (let [cl1 (command-line cmd-and-args1)
          cl2 (command-line cmd-and-args2)
          pos (java.io.PipedOutputStream.)
          pis (java.io.PipedInputStream. pos)
          output (java.io.ByteArrayOutputStream.)
          error (java.io.ByteArrayOutputStream.)
          exec1 (doto (DefaultExecutor.) (.setStreamHandler (PumpStreamHandler. pos nil nil)))
          exec2 (doto (DefaultExecutor.) (.setStreamHandler (PumpStreamHandler. output error pis)))
          t1-fn (fn [] (.execute exec1 cl1))
          t2-fn (fn [] (deliver result
                               (future (do
                                         (let [exit-code (.execute exec2 cl2)]
                                           {:exit exit-code
                                            :out (.toString output)
                                            :err (.toString error)})))))
          t1 (Thread. t1-fn)
          t2 (Thread. t2-fn)]
      (.start t1)
      (.start t2))
    @result))

On a practical note, there is quite a bit of convenient functionality in Commons Exec beyond concurrency and timeouts. It works cross platform, and to do so, it devised a scheme for quoting command-line strings in Windows. It also deals with behavior from programs like Photoshop which, unlike every other program out there, uses an exit code of 1 instead of 0 to indicate success. So any Clojure-y version of Commons Exec would need to handle that, too. But the benefit in such a hypothetical effort would come when you want the code platform (VM)-independent. E.g., when Clojure for the CLR is complete.