Scalable sorting

Sorting data is an important step in gathering data together for aggregation and reporting purposes.

Most Clojure sort algorithms you’ll find on the web assume you can fit all the items to sort into the memory of a single JVM instance. For example, many merge sorts will use partition to split the data set into 2 halves.

Where I work we frequently have to deal with large volumes of data where the data sets we deal with are far too large for these algos. Instead we have to find alternatives.

Here’s our approach to sorting large data. Although we did spend some time researching the problem, there’s bound to be more better algorithms out there. Please let me know if you find one!

Divide and conquer

To sort a large dataset we break it into smaller ‘bite-sized’ chunks which we can sort in memory, and write these sorted chunks to disk. Then we can run a lazy merge sort against these chunks. Although we currently run this on one node the nature of merge sort is that it can scale-out recursively and is compatible with map-reduce. In production use we’ve found that even on a single node huge datasets need a small degree of recursion just to avoid having too many chunks and running out of file handles. However, we’ll ignore recursion for the purposes of this discussion (we can always add it later).

A simple merge-sort function

First, let me print our merge-sort function minus the requisite doc-strings and comments.

(defn merge-sort
  ([^java.util.Comparator comp colls]
     (letfn [(next-item [[_ colls]]
               (if (nil? colls)
                 [:end nil]
                 (let [[[yield & p] & q] 
                       (sort-by first comp colls)]
                   [yield (if p (cons p q) q)])))]
           (->> colls
                (vector :begin) 
                (iterate next-item)
                (drop 1)
                (map first)
                (take-while (partial not= :end)))))
  ([colls]
     (merge-sort compare colls)))

Firstly, note the main function overload takes 2 arguments. The first is the Java comparator we’ll use to sort the data, the second is a collection of sorted sequences. These sequences don’t have to be in memory, they are lazy and their elements can be read in as we need them (from disk, network, etc.). However, we do assume the sequences have already been sorted with the same Java comparator given in the first argument. At the lowest level of recursion these sequences will be small enough to be sortable in memory and written out to disk – this is fairly trivial so we won’t cover it here.

We define a function called next-item which takes the collection sequences and returns a pair:

  • the first value is the next result in the merge-sorted sequence. This is called the yield.
  • the second value is the same collection of sequences we started with minus the yielded value. This is called the remainder and is used to calculate the next result.

Here’s the next-item function (first draft) :-

(fn [colls]
    (let [[[yield & p] & q] 
          (sort-by first comp colls)]
        [yield (cons p q))]))

The first job of the function is to sort the collections of sequences by their first values.

(sort-by first comp colls)

Here, comp is just the Java comparator that dictates the order of the sort.

Now we destructure. Since the item we want is the first item of the first sequence, we destructure like this :-

[[yield & p] & q]
  • yield is the item we are saying is the next item in the merged sorted sequence.
  • p is the rest of the sequence which contained yield.
  • q is the collection of all the other sequences.

(A nice ‘by-product’ of the sort-by is that q is already sorted. Even if the contents of p demand that we sort again, it won’t be an expensive operation. In fact, my own tests have shown that little or no performance improvement can be made by avoiding the use of Java’s sort by moving the head collection down the list – Java’s sort is very efficient for sets that are either ‘already sorted’ or ‘almost sorted’.)

Now we need to form our pair…

The first item is yield. That’s easy.

The second item is the collection of sequences with yield removed. That’s p joined onto the head of q, so we use cons.

(cons p q)

Remember that q is a collection of sequences sorted by ‘first item’. We don’t know that (cons p q) is sorted according to first item. It’s likely that the next result is in p or in (first q). However, we defer this sort operation to the next iteration, because re-sorting is the first thing we do in each iteration.

By the way, if p is nil then p is exhausted and we’ll just take q, hence this tweak :-

(if p (cons p q) q)

Putting this altogether, our final next-item function is as follows :-

(fn [colls]
    (let [[[yield & p] & q] 
          (sort-by first comp colls)]
        [yield (if p (cons p q) q)]]))

Repeat as necessary

Each time we run the next-item function we’ll get the next result in our merged sorted sequence. Sounds like another job for Clojure’s iterate function!

Our next-item function needs to be amend slightly so that it can work iteratively – it needs to accept the same kind of structure that it produces. We amend the function signature so it can take a pair as the argument, and we destructure to ignore the first item and label the second.

(fn [[_ colls]]
    (let [[[yield & p] & q] 
          (sort-by first comp colls)]
        [yield (if p (cons p q) q)]])

We’ll just have to initiate the iteration with a fake pair.

[:begin colls]

And we’ll have to ensure the iteration can tell the caller when to stop.

[:end nil]

Let’s build that ‘stop’ into our function :-

(next-item [[_ colls]]
  (if (nil? colls) [:end nil]
    (let [[[yield & p] & q] 
          (sort-by first comp colls)]
      [yield (if p (cons p q) q)])))

Now the inner function is finished let’s move onto the main body of the merge-sort function.

 (->> colls
     (vector :begin) 
     (iterate next-item)
     (drop 1)
     (map first)
     (take-while (partial not= :end))
     (lazy-seq))

We start by taking the colls argument and apply the ->> threading operator to it. This means that the result of each step will be added as the last argument of the next step.

(->> colls
    ... )

As we discussed earlier, we have to compose the first ‘fake’ pair to get the first iteration to work.

(vector :begin) 

This creates [:begin colls]. We should remember this is also the first item that iterate will yield so we’ll have to drop it later.

Now let’s run our next-item function (infinitely)

(iterate next-item)

We don’t want the first ‘fake’ result – it was just to help the iterate function, let’s drop it :-

(drop 1)

We’re only interested in the yield values (the first in each pair) :-

(map first)

And we need to continue until the stop marker :-

(take-while (partial not= :end))

Finally we overload the merge-sort function to use a default comparator where it isn’t specified.

Testing

Let’s define a function that can create n collections of m size populated with random numbers.

user> (defn make-colls [n m]
  (for [i (range n)]
    (sort
     (for [j (range m)]
       (rand-int 1000000)))))

Let’s evaluate now so this structure in in memory and its creation doesn’t affect our merge-sort performance measurements.

user> (def colls (make-colls 100 20000))

Let’s test it to ensure that it’s working. We expect the first collection to be sorted.

user> (take 10 (first colls))

(15 33 88 94 114 119 148 164 189 190)

However, when we merge sort, the numbers from all the collections are merged so we get a less rapidly incrementing sequence of numbers.

user> (take 10 (merge-sort colls))    

(1 1 1 2 2 5 6 7 7 8)

Actually merge-sorting is quite useful

Sort algorithms are often used as a learning guide, especially for functional languages. But how useful is sorting for tackling business problems?

It turns out we use merge-sort in multiple places in our Clojure system. The first use was for grouping of risk data so it can be collated on a trade-by-trade basis. Then we decided to rewrite an old Java-based scheduler that was expensive to maintain. We now have a lazy scheduler (we call it ‘Chime’) that uses this lazy merge-sort to merge together multiple schedules of repeating tasks into a single schedule. I’ll discuss more about Chime in a future post.