An empirical study of group-by strategies in Julia

Published Oct 28, 2017Last updated Nov 19, 2017

Update 2017 Nov 12

  • Detailed radixgroup method which is faster than data.table for bits types.
  • Multi-threaded radixsort method is the fastest known method

About me

I am a Data Scientist with 10 years+ experience in Credit Risk Data Science/Analytics/Modelling hoping to join Julia Computing full time and work on data related projects such as JuliaDB.jl and DataFrames.jl.

The group-by data manipulation problem

Julia is meant to be fast but upon initial investigation R's data.table is streets ahead of Julia in terms of group-by performance. So I wanted to investigate if Julia can beat R's data.table with more optimized algorithms and data structures.

The TL;DR version is that

  • Julia is faster than data.table if the group-by vector is of "bits type" e.g. Int an Float but not String
  • Multi-threaded radixsort method is the fastest method known so far
  • Julia using indexed columns are faster than data.tables pre-indexed group by
  • Julia is much slower than data.table if the group-by is a string
  • Group by using CategoricalArrays is much faster than data.table
  • Parallelized algorithm are slower probably due my inexperience in building these aglorithms

The code

The code used to produce the timings can be found at https://gist.github.com/xiaodaigh/fbdbbfd32062e33a20b1895af24e1542

Focus on sumby with large number of groups

After an initial study of the performance of group-by algorithms I found the biggest performance gap between Julia and data.table is the sumby case where the number of groups is large relative to the length of the group-by vector e.g. number of groups is 2_500_000 for a group-by vector of the length 250_000_000.

I figured if I can solve this efficiently then it will teach me alot about how to tackle other cases, hence my focus on this problem at this stage.

Let's illustrate the sumby algorithm with this example

byvec  = [100, 100, 200, 300, 400, 400]
valvec = [1,   1,   1,   1,   1,   1]

sumby(byvec, valvec) 
# should return Dict(100 => 2, 200 => 1, 300 => 1, 400 => 2)

this version of sumby is actually implemened by the StatsBase.countmap

Using StatsBase
import StatsBase.countmap
import StatsBase.Weigths

byvec  = [100, 100, 200, 300, 400, 400]
valvec = [1,   1,   1,   1,   1,   1]
countmap(byvec, weights(v1)

However I found that countmap is not as performant. Initially I also implemented my own version of sumby by studying the source codes of StatsBase.countmap and SplitApplyCombine.groupreduce. But later on I found that Radixsort-based methods are the fastest and hence based sumby on Radixsort.

Type considerations

I learnt that to make Julia's code run fast we need to tell Julia the type of our variables This can be problematic for DataFrames and function that work on DataFrames' columns as the type of each column is unknown until it is used. However I can focus on creating functions that accept vectors with known types and optimize that first. (see Chris Rackauckas' comment). I found there is not much overhead in picking the columns from a DataFrame and treating it as vector hence I have chosen to create algorithms for sumby with the following function signature first

function sumby{T,S}(by::Vector{T}, val::Vector{S})
 # some code...
end

Benchmark - data setup

I ran the following code to setup the test data in Julia

using SplitApplyCombine, FastGroupBy, BenchmarkTools

const N = Int64(2e9/8)
const K = 100

id6 = rand(1:Int64(round(N/K)), N) # the group-by vector
pool1 = [@sprintf "id%010d" k for k in 1:(N/K)]
id3 = rand(pool1, N) # the string group-by vector
v1 =  rand(1:5, N)

and in R I simply run

library(data.table)
N = 2e9/8
K = 100

 DT <- data.table(
  id6 = sample(N/K, N, TRUE),                       # small groups (int)
  id3 = sample(sprintf("id%010d",1:(N/K)), N, TRUE),# small groups (string)       
  v1 =  sample(5, N, TRUE)                          # int in range [1,5]
)    

Radixsort-based algorithms

It is well known that data.table uses Radixsort as its default sorting algorithm, so I looked into if radixsort can help speed up group by operations. Sure enough there is an implementation of Radixsort in SortAlgorithms.jl which I can base my algorithms on.

Radixsort and Radixgroup

I took the source code of the Radixsort function and repurposed it so that it sorts the group-by vector and the value vector in the same function. This method is the radixsort method.

Technically sumby does not require the output to be sorted, hence it may be possible to make a more performant sumby algorithm by avoiding a full sort. To understand how this can be achieved it's good to have a high level understanding of Radixsort.

Step High level explanation of radixsort in SortingAlgorithms.jl
1 Radixsort works by first sorting the array using counting sort on the first 11 binary digits (this is defined by SortingAlgorithms.RADIX_SIZE which equals 11). There are only a 2048 = (2^11) possible values in the first 11 binary digits.
2 A counting sort works by looping through the elements and count how many elements fall into each of the 2048 buckets; and then
3 loop through the elements again and use the counts to assign each element to its appropriate position in a newly created array of the same size; this is done by creating a cumulative sum of the counts, assign an element to the new array using the cumulative sum of its bucket as the array position and then decrement the cumulative sum corresponding to that bucket, continue until all elements are assigned to the new array
4 "throw away" the original array and repeat for the next set of 11 binary digits on the new array. It continues in this way until all digits are used in the sorted. And it can be proved that the resultant array is sorted at this point

The insight here is that once it has done the sorting of the first 11 binary digits (Step 2 above), we have broken up the array into up to 2048 buckets already; we need not to sort the whole array again, we can simply apply a sumby on each of 2048 buckets (some of which may be empty) knowing that two elements that are in different buckets can not end up in the same by-group. Using this ideas and a couple of optimizations I have implemented a sumby_radixgroup method, this is faster than the sumby_radixsort method on large input size.

Benchmark - Radixsort-based algorithms

The radixsort and radixgroup sumby is implemented in FastGroupBy.jl

using FastGroupBy
sumby_radixsort(id6, v1)
sumby_radixgroup(id6, v1)

The timings of Radixgroup sumby is about 14.9 seconds which is faster than data.table! The timings of Radixsort sumby is about 18.9 seconds which is on par with data.table.

In data.table one can index on a column and sacrifice some set up time for lightning fast performance. This can be achieved as below

setkey(DT, id6)
DT[,sum(v1), id6]

And the timing is ~2-3 seconds, but the Radixsort method in Julia outputs the original vectors in sorted order, similar to setkey above. I can apply a faster algorithm that assumes that elements belonging to the same group are organised contiguously (and being sorted is special case of this condition) to do group sum in ~1-2 seconds beating data.table's indexed method!

However the Radixsort can not deal with string types as it's not a bits type. One way around this is to hash the string using hash(a_string) and sort on the hashed results and then check for collisons in the summation. However hash.(id3) where id3 is a vector of length 250_000_000 took more than 1 minnute; this is the lower bound for speed for this algorithm as you must hash every string.

In data.table, the sum by id3 only took about 35 seconds (pre-indexed is about ~3 seconds) the same in Julia using hashed-Radixsort took 110 seconds so the performance gap for string group by is fairly signficant still, unless you store the string as CategoricalArrays (see benchmarks listed further ahead).

Benchmark - Multi-threaded radixsort

One obvious way to speed up the code is to apply sumby on mutually-exclusive subarrays and aggregate the results using multi-threaded code. I found that multi-threaded version of radixsort sumby is more performant than radixgroup.

On my machine the default number of threads is 4, and the timings were 10.5 seconds which is alot faster than R's data.table!

The function sumby_multi_rs is implemented in the FastGroupBy.jl package. The function blockranges and the overall structure was copied from this StackOverflow question.

function sumby_multi_rs{T, S<:Number}(by::AbstractVector{T},  val::AbstractVector{S})::Dict{T,S}
    (starts, ends) = blockranges(nthreads(), length(by))
    res = Dict{T,S}[]
    @threads for (ss, ee) in collect(zip(starts, ends))
        # @inbounds byv = @view by[ss:ee]
        # @inbounds valv = @view val[ss:ee]
        @inbounds push!(res, sumby_radixsort(by[ss:ee], val[ss:ee]))
    end


    szero  = zero(S)
    @inbounds res_fnl = res[1]
    for res1 in res[2:end]
        for k in keys(res1)
            res_fnl[k] = get(res_fnl,k,szero) + res1[k]
        end
    end
    return res_fnl
end

Benchmarks - dictionary based algorithm

The first algorithm that I came up with (which is indepently implemented in SplitApplyCombine.groupreduce is to enumerate through zip(by, val) and build up a hashtable where the keys are the unique values of by and correponding hashtable values are the cumulative sum of val. This algorithm works for any data type.

One can also use DataStructures.jl's Accumulators to achieve this but I found that SplitApplyCombine's implementation to be more performant.

The timings for Julia vs data.table is 46 seconds vs 20 seconds.

The timings in R might be a bit misleading as I find that R will hang for a few seconds before I can issue the next command, possibly due to some garbage collection in the background.

Benchmark - algorithms that require prior knowledge

I know that id6 contains the integers from 1 to 2_500_000 and can I use that knowledge some how to help? The answer is yes as I can simply create an array of size 2_500_000 to store the result. Accessing an element of an array and adding to it should be alot faster than hashing (however fast the hashing may be), also you need not resize the array (which I assume to be expensive), and manage the overheads of a Dict.

import Iterators.filter

function sumby_intrange{T<:Integer, S}(by::Vector{T}, val::Vector{S}, bylow::T, byhigh::T)
  lby = byhigh - bylow + 1
  res = zeros(T, lby)

  for (byi,vali) in zip(by, val)
    @inbounds res[(byi-bylow) + 1] += vali
  end
  filter(x -> x[2] != 0, zip(bylow:byhigh,res))
end
@time res = sumby_intrange(id6, v1, 1, 2_500_000); 

The timing for Julia is now 7 seconds.

However this approach is "cheating" but also shows glimpses of hope. Of course in real life settings we may not know anything about the data until we first interogate it, however there is no reason why we shouldn't keep metadata about our dataset once we've gathered it.

Implications for persistent data manipulation framework design
Therefore I think JuliaDB should actively store metadata regarding the column and provide functionalities to profile the data. The more you know about the data the easier it is to tailor the approach to achieve peak performance.

The other point worth mentioning is that it's not obvious how to "cheat" like this in the R world. It's harder in R to define your own vector type and you will probably need to go to C/C++ to code such an algorithm and link it to R. The advantage of Julia is that a novice Julia coder like myself can roll a fairly performant solution using Julia very quickly.

Benchmark - PooledArrays (and therefore CategoricalArrays)

Both PooledArrays.jl and CategoricalArrays.jl stores a pool array representing unique levels and uses integers that point to the pool array to represent a vector. We can apply the trick above to perform sumby relatively quickly. But it requires creating the vector as a CategoricalArray/PooledArray, which requires prior knowledge. Also converting an array to PooledArray is quite slow at the moment.

function sumby{S}(by::Union{PooledArray, CategoricalArray}, val::AbstractArray{S,1})
  l = length(by.pool)
  res = zeros(S, l)
  refs = Int64.(by.refs)

  for (i, v) in zip(refs, val)
    @inbounds res[i] += v
  end
  return Dict(by.pool[i] => res[i] for i in 1:l)
end

const N = Int64(2e9/8); const K = 100

using PooledArrays
import PooledArrays.PooledArray
pool = [1:Int64(round(N/K))...]

@time id6_pooled = PooledArray(PooledArrays.RefArray(rand(Int32(1):Int32(round(N/K)), N)), pool)
@time sumby(id6_pooled, v1)

The timing is similar at 9 seconds.

The above optimization can be performed in data.table as well and I have initiated a feature request on its Github page.

Benchmarks - Parallelized sum

In Julia one can use SharedArrays and write algorithm that work on chunks of the array. In this case we can let n workers work on a distinct range of the arrays each. The workers will collectively return n Dicts which we can reduce using an algorithm into one Dict. Although Chris' comment pointed out that it's not optimal, I am yet to understand fully why that might be the case and this is the best approach I can come up with for now.

However the time taken to "warm up" can be large as it requires converting the arrays to SharedArrays and it takes a bit of time to fire up Julia session using addprocs(). And during the run it can make other concurrent tasks (e.g. watching Youtube Julia con video) sluggish.

@time addprocs() # create Julia workers
@time using FastGroupBy
@everywhere using FastGroupBy
@everywhere using SplitApplyCombine
@time psumby(id6,v1) # 35 seconds

The timing is ~35 seconds

If you pre-convert the vectors into SharedArrays then you can speed it to aroudn ~29 seconds and this is slower than the Radixsort. I would say parallel processing is not as beginner friendly.

@time as = SharedArray(id6);
@time bs = SharedArray(v1);

@time psumby(as,bs)

Conclusion

Julia's promise is clear as a beginner like myself can roll a few novel solutions and test them out. This is not easily achievable with R as you would need C/C++ knowledge to build your own solutions. (Update: Managed to match data.table's performance on bits type group-by). However I am starting to have doubts as to whether Julia really does solve the "two language" problem? You can optimize Julia's performance but without resorting to C can it really beat data.table eventually for performance? Time will tell. I feel that getting to within a factor of 2 is not good enough, as many of my data manipulations steps take 5mins already, so extending that to 10mins is not an experience I would tolerate for daily usage.

As PCs get more and more powerful we will start to see laptop/desktop computers with 1TB of RAM becoming common place for data manipulations in several years time. Therefore many tasks that would have been considered "big data" (or just "medium data") will gradually become just "small data" problems, and Julia's superiority in terms of performance will truly shine in such use-cases. As it stands there are a few spots where Julia does not have fantastic performance, but I am fairly confident that many of these problems can be solved using Julia without having to resort to C/C++.

One thing I found is that you need lots of time and patientce to dig into Julia's ecosystem to find gems that can help you optimize performance. This means the community hasn't coalesced around a couple of tried-and-tested packages (like dplyr and data.table) and is symptomatic of two things

  • Julia's data ecosystem is still relatively immature; and
  • Julia is quite a bit more flexible than R in terms of allowing the average user to roll their own performant solutions; hence there are a lot of distinct solutions that try to solve the same problems in different ways. It will take time for clear winners to emerge and when they do they will be excellent solutions I am sure; but the current state is not that beginner friendly.

Overall I am very optimistic about the future of Julia because I find many smart people working hard in the data domain, and hopeful one day I can wholeheartedly recommend Julia as THE data manipulation platform. But for now that recommnedation still goes to R with data.table because it is still faster in many cases and has a richer data ecosystem around it.

Discover and read more posts from ZJ
get started
Enjoy this post?

Leave a like and comment for ZJ