By day, I work for a large investment bank in a department that calculates and distributes ‘Risk and PnL’. Simply put, for every transaction that is made by the bank a large amount of data needs to be re-computed periodically to help the bank understand its exposure to various things that can happen in the financial markets. Like many large organisations, banks produce large amounts of data files that must be processed. However, the data files are often so large that they’re impossible to load into the memory of a single 64-bit JVM.
Processing files as a stream is a good strategy. It minimizes ‘out of memory’ errors by utilizing a plentiful supply of disk space. How do you process a large file as a stream in Clojure?
We start by breaking the file up into a lazy sequence of tokens. The Clojure core library contains a function called line-seq
that breaks a file into individual lines. Or you can use the input-stream
or reader
functions in the clojure.java.io
namespace to turn a file into a lazy sequence of bytes or characters if you need finer granularity to form your own tokens depending on the nature of the data you are reading.
(ns example
(:require ))
(line-seq (io/reader "bigfile"))
This is simplified for the purposes of this article. In real-world scenarios you may choose to improve performance by using NIO memory-mapped files. These techniques utilize main memory when it’s available without breaking the system when it isn’t.
Tokenizing
Invaribly you will form a lazy sequence of tokens in some way. How should we go about processing these tokens? I’ve found that using a state machine is a useful approach. We first define some initial state. Our tokens are fed to a function (which we’ll call f
) along with the existing state. The function returns a new state, possibly changed.
Pacman
There are lots of other uses for this idea. Imagine we are asked to re-create the game of Pacman. We can list all the elements of the Pacman world :-
- Our hero, Pacman
- The positions of the ghosts
- The maze
- The pills
- The magic pills
- The existence of fruit
- Whether the ghosts are afraid of Pacman or not
- The player’s score
- The number of lives the player has left
- The state of the player’s joystick
We can store all this data in a single data-structure we call ‘the state’. We define a function, f
, which is given the current world as an argument and which computes the new world we would expect to see on the screen one frame later. The job of programming the game of Pacman is then greatly simplified – we just call f
iteratively and letting the game run as a ‘feedback loop’. All we have to do is implement the function f. (That job is left to the reader!)
How to apply the state-machine pattern in Clojure
There are a number of ways you can apply the state-machine method in Clojure. One simple way is to use the ‘second form’ of the reduce
function in the Clojure core library (that’s the one that takes some initial state).
(reduce f {} coll)
This first calls the function f with two arguments- an initial state (we used an empty map here) and the first element of the collection. The result of this becomes the first argument of another invocation of the same function, while the second argument is the next element of the collection. This repeated function application continues until the collection is exhausted.
Let’s pretend our tokens are ‘H’, ‘A’, ‘L’. This would be the result of applying reduce :-
(f (f (f {} 'H') 'A') 'L')
The reduce
function is a good starting point but it usually the wrong choice of function for parsing token streams :-
- It is not possible to look-ahead to know what subsequent tokens will be
- The function must always consume each token and cannot push the token back onto the stream.
Many parsers need both these features. So we need a refinement.
There’s also another subtler problem with reduce
that we’ll come on to :-
- We can’t inspect the operation of reduce while it is running, we have to wait for it to complete.
Using iterate
Fortunately the Clojure core library contains a more general method for the repetitive application of a function: iterate
.
Here’s how we can use iterate
to replace reduce
.
(iterate f {:remaining coll})
Our state is initialized with the stream itself. The function has the responsibility for taking the first element and processing it. It returns a new state with a new entry representing the remaining elements. Usually the function will replace the :remaining
element with remaining elements of the collection once the first element has been processed. However, it can do anything it chooses with this element, including looking ahead for future tokens and pushing back old ones.
The function is repeatedly applied with the map. The map can contain any state that is necessary for the processing of the stream. Often this includes stacks (vectors) to retain state at each level of context.
For the purposes of the rest of this article we’ll rewrite this using the ->>
threading macro. We use this macro a lot when manpulating sequences.
(->> {:remaining coll}
(iterate f))
Preventing infinite application
The problem with iterate
is that it tends to go on forever. We can avoid this by adding an :end
into our map when we detect the end of the stream. We can then make the resulting sequence finite by using take-while
.
(->> {:remaining coll}
(iterate f)
(take-while #(not (contains? % :end)))
Since iterate
returns a sequence of states we can choose to inspect any single state
Yielding
Often we want to parse a stream of tokens into a stream of something else, usually something more concrete and coarse-grained.
For example, huge XML files are often the result of repeated structures. In my case, this could be the calculated risk for every trade in a book. The structure itself for each trade isn’t complicated, but a large number of trades can make the resulting file very large. We have an algorithm for turning a stream of XML events into a stream of ‘virtual’ XML documents, each representing the entire XML file as it would look like if it contained just a single trade. The ‘stream’ is made up of these virtual XML documents, one of each trade. This allows us to tame the size of the file and avoid reading the entire file into memory.
The way to solve this problem is to allow our function to ‘yield’ whenever it has enough information to return a new data structure. We use the key :yield
in the returned map to indicate this.
We use filter
to filter out interrim states.
(->> {:remaining coll}
(iterate f)
(take-while #(not (contains? % :end)))
(map :yield)
(filter #(not (nil? %))))
Factoring out the pattern
It’s time to put all this together in its own function.
(defn parse [f coll]
(->> {:remaining coll}
(iterate f)
(take-while #(not (contains? % :end)))
(map :yield)
(filter #(not (nil? %)))))
Destructuring
We now discuss the function f
. Before we provide examples, let’s explain the structure of these functions a little more. The function f
is given a map as its only parameter.
This example function does nothing except consume the first token. It does this by applying next
to the :remaining
entry in the map.
(fn [m]
{:remaining (next (:remaining m))})
This is less convenient than if we had named arguments for everything in the map, but we can use destructuring to get close to this.
So here’s our function we some destructuring :-
(fn [{remaining :remaining}]
{:remaining (next remaining)})
But usually we want to do something with each token in the stream. Let’s destructure a little more.
(fn [{[token & remaining :as stream] :remaining}]
{:remaining remaining})
This sets token
to the token in the stream we shall process, remaining
as the collection of tokens that remain in stream after the token and stream
as the collection of remaining tokens includinng the token. This last setting is achieved using the :as
keyword. This is useful when we want to return a new state but without consuming the token, equivalent to a pushback. There are certain cases where this is useful.
Notice also how destructuring allows us to balance the complexity of the function body with the function signature. Let’s digress while we’re discussing destructuring to show how far we can go.
Extreme destructuring
Jay Fields provides a good overview of the destructuring ‘mini-language’ inside Clojure. At the time of writing there aren’t many other good web resources so I’ll show how useful destructuring can be for our example.
[jf]: http://blog.jayfields.com/2010/07/clojure-destructuring.html “Jay Fields”
In a real-world situation our function will need to make use of other entries in the map. Let’s assume our map also contains entries for :stack
, :indent
and :count
. We can destructure these in one go using the :keys
keyword :-
(fn [{[token & remaining :as stream] :remaining
:keys [stack indent count]
:or {indent 0}}]
{:remaining remaining})
In the case of indent we have also indicated a default value of 0. Variables that do not have corresponding keys in the map argument and which do not have defaults get the value of nil
.
Footnotes
The pattern described involes a lot of map manipulation. Of course, thanks to Clojure’s STM (Software Transactional Memory) we can mutate the map without worrying about where else we’ve referenced the map, since those values won’t change. However, since we often want to change multiple values in the map it would be useful to have a helper function.
(defn
^{:doc "Apply functions to values in a map."}
mapply
[m & kfs]
(apply hash-map
(mapcat (fn [[k f]] [k (f (get m k))])
(partition 2 kfs))))
In some situations mapply
has certain advantages over assoc
. With assoc
we have to get the data value out of the map, apply a function to it and then put it back in the map. We have to be careful to use the same key for the access and overwrite operations. By contrast, with mapply
we send the function straight to the map entry.
With mapply
, keys that are not specified are removed from the map. This is usually the safest thing to do to prevent temporary entries from bleeding across multiple state transitions. But it is easy to retain existing values in the map by specifying the key with the value of identity
.
;; Increment :count and retain the value of :foo.
(mapply {:count 1 :foo "foo"} :count inc :foo identity)
Finally, mapply
can insert a new value, just like assoc
does, by applying the constantly
function.
;; Remove :count and set the :foo to "bar".
(mapply {:count 1 :foo "foo"} :foo (constantly "bar"))
Nice writeup! I’m still quite new to Clojure, and have been wondering about how one would approach solving stream processing problems. I had gotten as far as figuring it would be lazy sequence based, but hadn’t worked out any details. Thanks for this.
Pingback: links for 2011-07-11 « Blarney Fellow
Is there a line missing in the last code snippet before the Footnotes heading? The square bracket isn’t closed, and there’s no :or clause to set a default value for indent.
@Mark Nutter
Thanks for spotting that Mark, it was there in the source but not appearing … just teething problems of a new blog I hope…
I’m a bit confused, you mentioned that you wanted to use the field :yield in the map to indicate that you want to yield a value, so, shouldn’t your filter line be the following? I’m still learning here so this is an honest question.
(filter #(contains? :yield %))
@Dave
That’s embarrassing – I left out a critical line of code. Yes, you can return the entire map, or just the value you are yielding, depending on what you need. I’ve fixed the article now which shows the latter case. Your code snippet correctly does the former.
Nice writeup. I hadn’t thought of mixing :keys with direct map destructuring – learn something new every day :)
Your ns :require seems to be missing ?
Isn’t update-in fairly close to what you want to achieve with mapply? It of course retains non-specified keys, but I suppose that could be fixed using select-keys?
As a clojure newcomer I recently spent quite some time figuring your method out, only to see that my problem (infinite processing) lies in the usage of contains?: It should be (contains? % :end).
Thanks for spotting that Dirk, I’ve fixed the article now. Please accept my apologies for the confusion the mistake caused.