Processing large files with Clojure.

By day, I work for a large investment bank in a department that calculates and distributes ‘Risk and PnL’. Simply put, for every transaction that is made by the bank a large amount of data needs to be re-computed periodically to help the bank understand its exposure to various things that can happen in the financial markets. Like many large organisations, banks produce large amounts of data files that must be processed. However, the data files are often so large that they’re impossible to load into the memory of a single 64-bit JVM.

Processing files as a stream is a good strategy. It minimizes ‘out of memory’ errors by utilizing a plentiful supply of disk space. How do you process a large file as a stream in Clojure?

We start by breaking the file up into a lazy sequence of tokens. The Clojure core library contains a function called line-seq that breaks a file into individual lines. Or you can use the input-stream or reader functions in the clojure.java.io namespace to turn a file into a lazy sequence of bytes or characters if you need finer granularity to form your own tokens depending on the nature of the data you are reading.

(ns example
  (:require ))

(line-seq (io/reader "bigfile"))

This is simplified for the purposes of this article. In real-world scenarios you may choose to improve performance by using NIO memory-mapped files. These techniques utilize main memory when it’s available without breaking the system when it isn’t.

Tokenizing

Invaribly you will form a lazy sequence of tokens in some way. How should we go about processing these tokens? I’ve found that using a state machine is a useful approach. We first define some initial state. Our tokens are fed to a function (which we’ll call f) along with the existing state. The function returns a new state, possibly changed.

Pacman

There are lots of other uses for this idea. Imagine we are asked to re-create the game of Pacman. We can list all the elements of the Pacman world :-

  • Our hero, Pacman
  • The positions of the ghosts
  • The maze
  • The pills
  • The magic pills
  • The existence of fruit
  • Whether the ghosts are afraid of Pacman or not
  • The player’s score
  • The number of lives the player has left
  • The state of the player’s joystick

We can store all this data in a single data-structure we call ‘the state’. We define a function, f, which is given the current world as an argument and which computes the new world we would expect to see on the screen one frame later. The job of programming the game of Pacman is then greatly simplified – we just call f iteratively and letting the game run as a ‘feedback loop’. All we have to do is implement the function f. (That job is left to the reader!)

How to apply the state-machine pattern in Clojure

There are a number of ways you can apply the state-machine method in Clojure. One simple way is to use the ‘second form’ of the reduce function in the Clojure core library (that’s the one that takes some initial state).

 (reduce f {} coll)

This first calls the function f with two arguments- an initial state (we used an empty map here) and the first element of the collection. The result of this becomes the first argument of another invocation of the same function, while the second argument is the next element of the collection. This repeated function application continues until the collection is exhausted.

Let’s pretend our tokens are ‘H’, ‘A’, ‘L’. This would be the result of applying reduce :-

 (f (f (f {} 'H') 'A') 'L')

The reduce function is a good starting point but it usually the wrong choice of function for parsing token streams :-

  • It is not possible to look-ahead to know what subsequent tokens will be
  • The function must always consume each token and cannot push the token back onto the stream.

Many parsers need both these features. So we need a refinement.

There’s also another subtler problem with reduce that we’ll come on to :-

  • We can’t inspect the operation of reduce while it is running, we have to wait for it to complete.

Using iterate

Fortunately the Clojure core library contains a more general method for the repetitive application of a function: iterate.

Here’s how we can use iterate to replace reduce.

(iterate f {:remaining coll})

Our state is initialized with the stream itself. The function has the responsibility for taking the first element and processing it. It returns a new state with a new entry representing the remaining elements. Usually the function will replace the :remaining element with remaining elements of the collection once the first element has been processed. However, it can do anything it chooses with this element, including looking ahead for future tokens and pushing back old ones.

The function is repeatedly applied with the map. The map can contain any state that is necessary for the processing of the stream. Often this includes stacks (vectors) to retain state at each level of context.

For the purposes of the rest of this article we’ll rewrite this using the ->> threading macro. We use this macro a lot when manpulating sequences.

(->> {:remaining coll}
     (iterate f))

Preventing infinite application

The problem with iterate is that it tends to go on forever. We can avoid this by adding an :end into our map when we detect the end of the stream. We can then make the resulting sequence finite by using take-while.

(->> {:remaining coll}
     (iterate f)
     (take-while #(not (contains? % :end)))

Since iterate returns a sequence of states we can choose to inspect any single state

Yielding

Often we want to parse a stream of tokens into a stream of something else, usually something more concrete and coarse-grained.

For example, huge XML files are often the result of repeated structures. In my case, this could be the calculated risk for every trade in a book. The structure itself for each trade isn’t complicated, but a large number of trades can make the resulting file very large. We have an algorithm for turning a stream of XML events into a stream of ‘virtual’ XML documents, each representing the entire XML file as it would look like if it contained just a single trade. The ‘stream’ is made up of these virtual XML documents, one of each trade. This allows us to tame the size of the file and avoid reading the entire file into memory.

The way to solve this problem is to allow our function to ‘yield’ whenever it has enough information to return a new data structure. We use the key :yield in the returned map to indicate this.

We use filter to filter out interrim states.

(->> {:remaining coll}
     (iterate f)
     (take-while #(not (contains? % :end)))
     (map :yield)
     (filter #(not (nil? %))))

Factoring out the pattern

It’s time to put all this together in its own function.

(defn parse [f coll]
  (->> {:remaining coll}
       (iterate f)
       (take-while #(not (contains? % :end)))
       (map :yield)
       (filter #(not (nil? %)))))

Destructuring

We now discuss the function f. Before we provide examples, let’s explain the structure of these functions a little more. The function f is given a map as its only parameter.

This example function does nothing except consume the first token. It does this by applying next to the :remaining entry in the map.

(fn [m] 
    {:remaining (next (:remaining m))})

This is less convenient than if we had named arguments for everything in the map, but we can use destructuring to get close to this.
So here’s our function we some destructuring :-

(fn [{remaining :remaining}] 
    {:remaining (next remaining)})

But usually we want to do something with each token in the stream. Let’s destructure a little more.

(fn [{[token & remaining :as stream] :remaining}] 
    {:remaining remaining})

This sets token to the token in the stream we shall process, remaining as the collection of tokens that remain in stream after the token and stream as the collection of remaining tokens includinng the token. This last setting is achieved using the :as keyword. This is useful when we want to return a new state but without consuming the token, equivalent to a pushback. There are certain cases where this is useful.

Notice also how destructuring allows us to balance the complexity of the function body with the function signature. Let’s digress while we’re discussing destructuring to show how far we can go.

Extreme destructuring

Jay Fields provides a good overview of the destructuring ‘mini-language’ inside Clojure. At the time of writing there aren’t many other good web resources so I’ll show how useful destructuring can be for our example.

[jf]: http://blog.jayfields.com/2010/07/clojure-destructuring.html “Jay Fields”

In a real-world situation our function will need to make use of other entries in the map. Let’s assume our map also contains entries for :stack, :indent and :count. We can destructure these in one go using the :keys keyword :-

(fn [{[token & remaining :as stream] :remaining 
                 :keys [stack indent count] 
                 :or {indent 0}}] 
    {:remaining remaining})

In the case of indent we have also indicated a default value of 0. Variables that do not have corresponding keys in the map argument and which do not have defaults get the value of nil.

Footnotes

The pattern described involes a lot of map manipulation. Of course, thanks to Clojure’s STM (Software Transactional Memory) we can mutate the map without worrying about where else we’ve referenced the map, since those values won’t change. However, since we often want to change multiple values in the map it would be useful to have a helper function.

(defn 
  ^{:doc "Apply functions to values in a map."}
  mapply
  [m & kfs]
  (apply hash-map
         (mapcat (fn [[k f]] [k (f (get m k))])
                 (partition 2 kfs))))

In some situations mapply has certain advantages over assoc. With assoc we have to get the data value out of the map, apply a function to it and then put it back in the map. We have to be careful to use the same key for the access and overwrite operations. By contrast, with mapply we send the function straight to the map entry.

With mapply, keys that are not specified are removed from the map. This is usually the safest thing to do to prevent temporary entries from bleeding across multiple state transitions. But it is easy to retain existing values in the map by specifying the key with the value of identity.

;; Increment :count and retain the value of :foo.
(mapply {:count 1 :foo "foo"} :count inc :foo identity) 

Finally, mapply can insert a new value, just like assoc does, by applying the constantly function.

;; Remove :count and set the :foo to "bar".
(mapply {:count 1 :foo "foo"} :foo (constantly "bar"))

10 thoughts on “Processing large files with Clojure.

  1. Nice writeup! I’m still quite new to Clojure, and have been wondering about how one would approach solving stream processing problems. I had gotten as far as figuring it would be lazy sequence based, but hadn’t worked out any details. Thanks for this.

  2. Pingback: links for 2011-07-11 « Blarney Fellow

  3. Is there a line missing in the last code snippet before the Footnotes heading? The square bracket isn’t closed, and there’s no :or clause to set a default value for indent.

  4. I’m a bit confused, you mentioned that you wanted to use the field :yield in the map to indicate that you want to yield a value, so, shouldn’t your filter line be the following? I’m still learning here so this is an honest question.


    (filter #(contains? :yield %))

    • @Dave
      That’s embarrassing – I left out a critical line of code. Yes, you can return the entire map, or just the value you are yielding, depending on what you need. I’ve fixed the article now which shows the latter case. Your code snippet correctly does the former.

  5. Isn’t update-in fairly close to what you want to achieve with mapply? It of course retains non-specified keys, but I suppose that could be fixed using select-keys?

  6. As a clojure newcomer I recently spent quite some time figuring your method out, only to see that my problem (infinite processing) lies in the usage of contains?: It should be (contains? % :end).

    • Thanks for spotting that Dirk, I’ve fixed the article now. Please accept my apologies for the confusion the mistake caused.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>